diff options
author | gvit <gvit@ydb.tech> | 2023-06-10 01:01:00 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2023-06-10 01:01:00 +0300 |
commit | 91497eb27263e2feb35b53a90773e7207752a2ec (patch) | |
tree | 1ca8afb808fe2bfeece5b0c103cfb849020dfd03 | |
parent | 41e40901a93e04955628042e75367b2662a81542 (diff) | |
download | ydb-91497eb27263e2feb35b53a90773e7207752a2ec.tar.gz |
fix followers
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 82 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scan_executer.cpp | 3 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_shards_resolver.cpp | 10 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_shards_resolver.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_tasks_graph.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 41 | ||||
-rw-r--r-- | ydb/core/protos/tx_datashard.proto | 1 | ||||
-rw-r--r-- | ydb/core/testlib/basics/helpers.h | 4 | ||||
-rw-r--r-- | ydb/core/testlib/basics/services.cpp | 7 | ||||
-rw-r--r-- | ydb/core/testlib/tablet_helpers.cpp | 4 | ||||
-rw-r--r-- | ydb/core/testlib/tablet_helpers.h | 2 | ||||
-rw-r--r-- | ydb/core/testlib/test_client.cpp | 2 | ||||
-rw-r--r-- | ydb/core/testlib/test_client.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_followers.cpp | 72 |
15 files changed, 176 insertions, 62 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index ead0bfda37..7c6936c0f7 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -177,6 +177,31 @@ public: } } + bool ForceAcquireSnapshot() const { + const bool forceSnapshot = ( + ReadOnlyTx && + !ImmediateTx && + !HasPersistentChannels && + (!Database.empty() || AppData()->EnableMvccSnapshotWithLegacyDomainRoot) && + AppData()->FeatureFlags.GetEnableMvccSnapshotReads() + ); + + return forceSnapshot; + } + + bool GetUseFollowers() const { + return ( + // first, we must specify read stale flag. + Request.IsolationLevel == NKikimrKqp::ISOLATION_LEVEL_READ_STALE && + // next, if snapshot is already defined, so in this case followers are not allowed. + !GetSnapshot().IsValid() && + // ensure that followers are allowed only for read only transactions. + ReadOnlyTx && + // if we are forced to acquire snapshot by some reason, so we cannot use followers. + !ForceAcquireSnapshot() + ); + } + void Finalize() { if (LocksBroken) { TString message = "Transaction locks invalidated."; @@ -1131,7 +1156,7 @@ private: LOG_I("Reattach to shard " << tabletId); - Send(MakePipePeNodeCacheID(UseFollowers), new TEvPipeCache::TEvForward( + Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward( new TEvDataShard::TEvProposeTransactionAttach(tabletId, TxId), tabletId, /* subscribe */ true), 0, ++shardState->ReattachState.Cookie); } @@ -1502,7 +1527,7 @@ private: TShardState shardState; shardState.State = ImmediateTx ? TShardState::EState::Executing : TShardState::EState::Preparing; shardState.DatashardState.ConstructInPlace(); - shardState.DatashardState->Follower = UseFollowers; + shardState.DatashardState->Follower = GetUseFollowers(); if (Deadline) { TDuration timeout = *Deadline - TAppData::TimeProvider->Now(); @@ -1582,7 +1607,7 @@ private: LOG_D("ExecuteDatashardTransaction traceId.verbosity: " << std::to_string(traceId.GetVerbosity())); - Send(MakePipePeNodeCacheID(UseFollowers), new TEvPipeCache::TEvForward(ev.release(), shardId, true), 0, 0, std::move(traceId)); + Send(MakePipePeNodeCacheID(GetUseFollowers()), new TEvPipeCache::TEvForward(ev.release(), shardId, true), 0, 0, std::move(traceId)); auto result = ShardStates.emplace(shardId, std::move(shardState)); YQL_ENSURE(result.second); @@ -1777,17 +1802,9 @@ private: // Single-shard transactions are always immediate ImmediateTx = (datashardTxs.size() + Request.TopicOperations.GetSize() + readActors) <= 1 && !UnknownAffectedShardCount; - if (ImmediateTx) { - // Transaction cannot be both immediate and volatile - YQL_ENSURE(!VolatileTx); - } - switch (Request.IsolationLevel) { // OnlineRO with AllowInconsistentReads = true case NKikimrKqp::ISOLATION_LEVEL_READ_UNCOMMITTED: - // StaleRO transactions always execute as immediate - // (legacy behavior, for compatibility with current execution engine) - case NKikimrKqp::ISOLATION_LEVEL_READ_STALE: YQL_ENSURE(ReadOnlyTx); YQL_ENSURE(!VolatileTx); ImmediateTx = true; @@ -1797,6 +1814,11 @@ private: break; } + if (ImmediateTx) { + // Transaction cannot be both immediate and volatile + YQL_ENSURE(!VolatileTx); + } + if ((ReadOnlyTx || Request.UseImmediateEffects) && GetSnapshot().IsValid()) { // Snapshot reads are always immediate // Uncommitted writes are executed without coordinators, so they can be immediate @@ -1813,13 +1835,16 @@ private: prepareTasksSpan.End(); } + TasksGraph.GetMeta().UseFollowers = GetUseFollowers(); + if (RemoteComputeTasks) { TSet<ui64> shardIds; for (const auto& [shardId, _] : RemoteComputeTasks) { shardIds.insert(shardId); } - auto kqpShardsResolver = CreateKqpShardsResolver(SelfId(), TxId, std::move(shardIds)); + auto kqpShardsResolver = CreateKqpShardsResolver( + SelfId(), TxId, TasksGraph.GetMeta().UseFollowers, std::move(shardIds)); RegisterWithSameMailbox(kqpShardsResolver); Become(&TKqpDataExecuter::WaitResolveState); } else { @@ -1838,14 +1863,7 @@ private: } void OnShardsResolve() { - const bool forceSnapshot = ( - ReadOnlyTx && - !ImmediateTx && - !HasPersistentChannels && - (!Database.empty() || AppData()->EnableMvccSnapshotWithLegacyDomainRoot) && - AppData()->FeatureFlags.GetEnableMvccSnapshotReads()); - - if (forceSnapshot) { + if (ForceAcquireSnapshot()) { YQL_ENSURE(!VolatileTx); auto longTxService = NLongTxService::MakeLongTxServiceID(SelfId().NodeId()); Send(longTxService, new NLongTxService::TEvLongTxService::TEvAcquireReadSnapshot(Database)); @@ -1896,21 +1914,6 @@ private: using TTopicTabletTxs = THashMap<ui64, NKikimrPQ::TDataTransaction>; void ContinueExecute() { - UseFollowers = Request.IsolationLevel == NKikimrKqp::ISOLATION_LEVEL_READ_STALE; - - if (!ImmediateTx) { - // Followers only allowed for single shard transactions. - // (legacy behaviour, for compatibility with current execution engine) - UseFollowers = false; - } - if (GetSnapshot().IsValid()) { - // TODO: KIKIMR-11912 - UseFollowers = false; - } - if (UseFollowers) { - YQL_ENSURE(ReadOnlyTx); - } - if (Stats) { //Stats->AffectedShards = datashardTxs.size(); Stats->DatashardStats.reserve(DatashardTxs.size()); @@ -2204,7 +2207,7 @@ private: << ", volatile: " << VolatileTx << ", immediate: " << ImmediateTx << ", pending compute tasks" << PendingComputeTasks.size() - << ", useFollowers: " << UseFollowers); + << ", useFollowers: " << GetUseFollowers()); LOG_T("Updating channels after the creation of compute actors"); THashMap<TActorId, THashSet<ui64>> updates; @@ -2245,7 +2248,7 @@ private: LOG_D("Executing KQP transaction on topic tablet: " << tabletId << ", lockTxId: " << lockTxId); - Send(MakePipePeNodeCacheID(UseFollowers), + Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(ev.release(), tabletId, true), 0, 0, @@ -2255,7 +2258,7 @@ private: state.State = ImmediateTx ? TShardState::EState::Executing : TShardState::EState::Preparing; state.DatashardState.ConstructInPlace(); - state.DatashardState->Follower = UseFollowers; + state.DatashardState->Follower = false; state.DatashardState->ShardReadLocks = Request.TopicOperations.TabletHasReadOperations(tabletId); @@ -2274,7 +2277,7 @@ private: Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(0)); - if (UseFollowers) { + if (GetUseFollowers()) { Send(MakePipePeNodeCacheID(true), new TEvPipeCache::TEvUnlink(0)); } @@ -2339,7 +2342,6 @@ private: bool ReadOnlyTx = true; bool VolatileTx = false; bool ImmediateTx = false; - bool UseFollowers = false; bool TxPlanned = false; bool LocksBroken = false; diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 711e42f7f6..4aebc7337f 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -490,7 +490,8 @@ private: } if (shardIds) { LOG_D("Start resolving tablets nodes... (" << shardIds.size() << ")"); - auto kqpShardsResolver = CreateKqpShardsResolver(this->SelfId(), TxId, std::move(shardIds)); + auto kqpShardsResolver = CreateKqpShardsResolver( + this->SelfId(), TxId, false, std::move(shardIds)); KqpShardsResolverId = this->RegisterWithSameMailbox(kqpShardsResolver); } else { GetResourcesSnapshot(); diff --git a/ydb/core/kqp/executer_actor/kqp_shards_resolver.cpp b/ydb/core/kqp/executer_actor/kqp_shards_resolver.cpp index cbc586e4ad..cf9ac91382 100644 --- a/ydb/core/kqp/executer_actor/kqp_shards_resolver.cpp +++ b/ydb/core/kqp/executer_actor/kqp_shards_resolver.cpp @@ -36,11 +36,12 @@ public: } public: - TKqpShardsResolver(const TActorId& owner, ui64 txId, TSet<ui64>&& shardIds) + TKqpShardsResolver(const TActorId& owner, ui64 txId, bool useFollowers, TSet<ui64>&& shardIds) : Owner(owner) , TxId(txId) , ShardIds(std::move(shardIds)) - , TabletResolver(MakePipePeNodeCacheID(false)) + , UseFollowers(useFollowers) + , TabletResolver(MakePipePeNodeCacheID(UseFollowers)) {} void Bootstrap() { @@ -112,6 +113,7 @@ private: const TActorId Owner; const ui64 TxId; const TSet<ui64> ShardIds; + const bool UseFollowers; const TActorId TabletResolver; TMap<ui64, ui32> RetryCount; TMap<ui64, ui64> Result; @@ -119,8 +121,8 @@ private: } // anonymous namespace -IActor* CreateKqpShardsResolver(const TActorId& owner, ui64 txId, TSet<ui64>&& shardIds) { - return new TKqpShardsResolver(owner, txId, std::move(shardIds)); +IActor* CreateKqpShardsResolver(const TActorId& owner, ui64 txId, bool useFollowers, TSet<ui64>&& shardIds) { + return new TKqpShardsResolver(owner, txId, useFollowers, std::move(shardIds)); } } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_shards_resolver.h b/ydb/core/kqp/executer_actor/kqp_shards_resolver.h index 720a6086ab..cb0df8421a 100644 --- a/ydb/core/kqp/executer_actor/kqp_shards_resolver.h +++ b/ydb/core/kqp/executer_actor/kqp_shards_resolver.h @@ -5,6 +5,6 @@ namespace NKikimr::NKqp { -NActors::IActor* CreateKqpShardsResolver(const NActors::TActorId& owner, ui64 txId, TSet<ui64>&& shardIds); +NActors::IActor* CreateKqpShardsResolver(const NActors::TActorId& owner, ui64 txId, bool useFollowers, TSet<ui64>&& shardIds); } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp index 329238bc78..448dfb3953 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp @@ -670,11 +670,16 @@ void AddSnapshotInfoToTaskInputs(const TKqpTasksGraph& tasksGraph, NYql::NDqProt NKikimrTxDataShard::TKqpReadRangesSourceSettings settings; YQL_ENSURE(settingsAny.UnpackTo(&settings), "Failed to unpack settings"); + if (snapshot.IsValid()) { settings.MutableSnapshot()->SetStep(snapshot.Step); settings.MutableSnapshot()->SetTxId(snapshot.TxId); } + if (tasksGraph.GetMeta().UseFollowers) { + settings.SetUseFollowers(tasksGraph.GetMeta().UseFollowers); + } + source->MutableSettings()->PackFrom(settings); } } diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h index 911a7da01c..f86873dd7c 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h @@ -89,6 +89,7 @@ struct TGraphMeta { IKqpGateway::TKqpSnapshot Snapshot; std::unordered_map<ui64, TActorId> ResultChannelProxies; TActorId ExecuterId; + bool UseFollowers = false; void SetSnapshot(ui64 step, ui64 txId) { Snapshot = IKqpGateway::TKqpSnapshot(step, txId); diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index 617b458803..065e17f013 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -61,7 +61,8 @@ THolder<NKikimr::TEvDataShard::TEvReadAck> DefaultAckSettings() { return result; } -NActors::TActorId PipeCacheId = NKikimr::MakePipePeNodeCacheID(false); +NActors::TActorId MainPipeCacheId = NKikimr::MakePipePeNodeCacheID(false); +NActors::TActorId FollowersPipeCacheId = NKikimr::MakePipePeNodeCacheID(true); TDuration StartRetryDelay = TDuration::MilliSeconds(250); @@ -104,6 +105,7 @@ public: size_t ResolveAttempt = 0; size_t RetryAttempt = 0; + size_t SuccessBatches = 0; TShardState(ui64 tabletId) : TabletId(tabletId) @@ -360,6 +362,8 @@ public: , HolderFactory(args.HolderFactory) , Alloc(args.Alloc) , Counters(counters) + , UseFollowers(false) + , PipeCacheId(MainPipeCacheId) { TableId = TTableId( Settings.GetTable().GetTableId().GetOwnerId(), @@ -368,6 +372,13 @@ public: Settings.GetTable().GetTableId().GetSchemaVersion() ); + if (Settings.GetUseFollowers() && !Snapshot.IsValid()) { + // reading from followers is allowed only of snapshot is not specified and + // specific flag is set. otherwise we always read from main replicas. + PipeCacheId = FollowersPipeCacheId; + UseFollowers = true; + } + InitResultColumns(); KeyColumnTypes.reserve(Settings.GetKeyColumnTypes().size()); @@ -830,7 +841,7 @@ public: Counters->CreatedIterators->Inc(); ReadIdByTabletId[state->TabletId].push_back(id); - Send(::PipeCacheId, new TEvPipeCache::TEvForward(ev.Release(), state->TabletId, true), + Send(PipeCacheId, new TEvPipeCache::TEvForward(ev.Release(), state->TabletId, true), IEventHandle::FlagTrackDelivery); } @@ -868,9 +879,20 @@ public: CA_LOG_D("read id #" << id << " got issue " << issue.Getmessage()); Reads[id].Shard->Issues.push_back(issue); } + + if (UseFollowers && record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS && Reads[id].Shard->SuccessBatches > 0) { + // read from follower is interrupted with error after several successful responses. + // in this case read is not safe because we can return inconsistent data. + NYql::TIssues issues; + NYql::IssuesFromMessage(record.GetStatus().GetIssues(), issues); + return RuntimeError("Failed to read from follower", NYql::NDqProto::StatusIds::UNAVAILABLE, issues); + } + switch (record.GetStatus().GetCode()) { - case Ydb::StatusIds::SUCCESS: + case Ydb::StatusIds::SUCCESS: { + Reads[id].Shard->SuccessBatches++; break; + } case Ydb::StatusIds::OVERLOADED: { return RetryRead(id, false); } @@ -940,7 +962,7 @@ public: auto* state = Reads[id].Shard; auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>(); cancel->Record.SetReadId(id); - Send(::PipeCacheId, new TEvPipeCache::TEvForward(cancel.Release(), state->TabletId, false)); + Send(PipeCacheId, new TEvPipeCache::TEvForward(cancel.Release(), state->TabletId, false)); Reads[id].Reset(); ResetReads++; @@ -1152,7 +1174,7 @@ public: } Counters->SentIteratorAcks->Inc(); CA_LOG_D("sending ack for read #" << id << " limit " << limit << " seqno = " << record.GetSeqNo()); - Send(::PipeCacheId, new TEvPipeCache::TEvForward(request.Release(), Reads[id].Shard->TabletId, true), + Send(PipeCacheId, new TEvPipeCache::TEvForward(request.Release(), Reads[id].Shard->TabletId, true), IEventHandle::FlagTrackDelivery); } else { Reads[id].Finished = true; @@ -1235,7 +1257,10 @@ public: for (size_t i = 0; i < Reads.size(); ++i) { ResetRead(i); } - Send(PipeCacheId, new TEvPipeCache::TEvUnlink(0)); + Send(::MainPipeCacheId, new TEvPipeCache::TEvUnlink(0)); + if (UseFollowers) { + Send(::FollowersPipeCacheId, new TEvPipeCache::TEvUnlink(0)); + } } TBase::PassAway(); } @@ -1334,6 +1359,8 @@ private: std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc; TIntrusivePtr<TKqpCounters> Counters; + bool UseFollowers; + NActors::TActorId PipeCacheId; }; @@ -1355,7 +1382,7 @@ void InjectRangeEvReadAckSettings(const NKikimrTxDataShard::TEvReadAck& ack) { } void InterceptReadActorPipeCache(NActors::TActorId id) { - ::PipeCacheId = id; + ::MainPipeCacheId = id; } } // namespace NKqp diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index 9efbc6adc4..7b42f2f32f 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -275,6 +275,7 @@ message TKqpReadRangesSourceSettings { optional uint32 LockNodeId = 14; optional uint64 MaxInFlightShards = 16; + optional bool UseFollowers = 17 [default = false]; } message TKqpTaskInfo { diff --git a/ydb/core/testlib/basics/helpers.h b/ydb/core/testlib/basics/helpers.h index 9658b83714..96c9197e61 100644 --- a/ydb/core/testlib/basics/helpers.h +++ b/ydb/core/testlib/basics/helpers.h @@ -44,7 +44,7 @@ namespace NFake { ui32 nrings, ui32 ringSize, ui64 stateStorageGroup); void SetupBSNodeWarden(TTestActorRuntime& runtime, ui32 nodeIndex, TIntrusivePtr<TNodeWardenConfig> nodeWardenConfig); void SetupTabletResolver(TTestActorRuntime& runtime, ui32 nodeIndex); - void SetupTabletPipePeNodeCaches(TTestActorRuntime& runtime, ui32 nodeIndex); + void SetupTabletPipePeNodeCaches(TTestActorRuntime& runtime, ui32 nodeIndex, bool forceFollowers = false); void SetupResourceBroker(TTestActorRuntime& runtime, ui32 nodeIndex); void SetupSharedPageCache(TTestActorRuntime& runtime, ui32 nodeIndex, NFake::TCaches caches); void SetupNodeWhiteboard(TTestActorRuntime& runtime, ui32 nodeIndex); @@ -57,7 +57,7 @@ namespace NFake { // StateStorage, NodeWarden, TabletResolver, ResourceBroker, SharedPageCache void SetupBasicServices(TTestActorRuntime &runtime, TAppPrepare &app, bool mockDisk = false, - NFake::INode *factory = nullptr, NFake::TStorage storage = {}, NFake::TCaches caches = {}); + NFake::INode *factory = nullptr, NFake::TStorage storage = {}, NFake::TCaches caches = {}, bool forceFollowers = false); /// class TStrandedPDiskServiceFactory : public IPDiskServiceFactory { diff --git a/ydb/core/testlib/basics/services.cpp b/ydb/core/testlib/basics/services.cpp index 9ffba79e19..7cbff912a7 100644 --- a/ydb/core/testlib/basics/services.cpp +++ b/ydb/core/testlib/basics/services.cpp @@ -66,7 +66,7 @@ namespace NPDisk { TActorSetupCmd(tabletResolver, TMailboxType::Revolving, 0), nodeIndex); } - void SetupTabletPipePeNodeCaches(TTestActorRuntime& runtime, ui32 nodeIndex) + void SetupTabletPipePeNodeCaches(TTestActorRuntime& runtime, ui32 nodeIndex, bool forceFollowers) { TIntrusivePtr<TPipePeNodeCacheConfig> leaderPipeConfig = new TPipePeNodeCacheConfig(); leaderPipeConfig->PipeRefreshTime = TDuration::Zero(); @@ -76,6 +76,7 @@ namespace NPDisk { followerPipeConfig->PipeRefreshTime = TDuration::Seconds(30); followerPipeConfig->PipeConfig.AllowFollower = true; followerPipeConfig->PipeConfig.RetryPolicy = {.RetryLimitCount = 3}; + followerPipeConfig->PipeConfig.ForceFollower = forceFollowers; runtime.AddLocalService(MakePipePeNodeCacheID(false), TActorSetupCmd(CreatePipePeNodeCache(leaderPipeConfig), TMailboxType::Revolving, 0), nodeIndex); @@ -310,7 +311,7 @@ namespace NPDisk { } void SetupBasicServices(TTestActorRuntime& runtime, TAppPrepare& app, bool mock, - NFake::INode* factory, NFake::TStorage storage, NFake::TCaches caches) + NFake::INode* factory, NFake::TStorage storage, NFake::TCaches caches, bool forceFollowers) { runtime.SetDispatchTimeout(storage.UseDisk ? DISK_DISPATCH_TIMEOUT : DEFAULT_DISPATCH_TIMEOUT); @@ -336,7 +337,7 @@ namespace NPDisk { SetupBSNodeWarden(runtime, nodeIndex, disk.MakeWardenConf(*app.Domains, keyConfig)); SetupTabletResolver(runtime, nodeIndex); - SetupTabletPipePeNodeCaches(runtime, nodeIndex); + SetupTabletPipePeNodeCaches(runtime, nodeIndex, forceFollowers); SetupResourceBroker(runtime, nodeIndex); SetupSharedPageCache(runtime, nodeIndex, caches); SetupBlobCache(runtime, nodeIndex); diff --git a/ydb/core/testlib/tablet_helpers.cpp b/ydb/core/testlib/tablet_helpers.cpp index d52aac91e2..76e0aed50d 100644 --- a/ydb/core/testlib/tablet_helpers.cpp +++ b/ydb/core/testlib/tablet_helpers.cpp @@ -618,13 +618,13 @@ namespace NKikimr { } void SetupTabletServices(TTestActorRuntime &runtime, TAppPrepare *app, bool mockDisk, NFake::TStorage storage, - NFake::TCaches caches) { + NFake::TCaches caches, bool forceFollowers) { TAutoPtr<TAppPrepare> dummy; if (app == nullptr) { dummy = app = new TAppPrepare; } TUltimateNodes nodes(runtime, app); - SetupBasicServices(runtime, *app, mockDisk, &nodes, storage, caches); + SetupBasicServices(runtime, *app, mockDisk, &nodes, storage, caches, forceFollowers); } TDomainsInfo::TDomain::TStoragePoolKinds DefaultPoolKinds(ui32 count) { diff --git a/ydb/core/testlib/tablet_helpers.h b/ydb/core/testlib/tablet_helpers.h index 81a7c614aa..9e3616148e 100644 --- a/ydb/core/testlib/tablet_helpers.h +++ b/ydb/core/testlib/tablet_helpers.h @@ -26,7 +26,7 @@ namespace NKikimr { void RebootTablet(TTestActorRuntime& runtime, ui64 tabletId, const TActorId& sender, ui32 nodeIndex = 0, bool sysTablet = false); void GracefulRestartTablet(TTestActorRuntime& runtime, ui64 tabletId, const TActorId& sender, ui32 nodeIndex = 0); void SetupTabletServices(TTestActorRuntime& runtime, TAppPrepare* app = nullptr, bool mockDisk = false, - NFake::TStorage storage = {}, NFake::TCaches caches = {}); + NFake::TStorage storage = {}, NFake::TCaches caches = {}, bool forceFollowers = false); const TString DEFAULT_STORAGE_POOL = "Storage Pool with id: 1"; diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 046413257e..ce8e8a66c1 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -238,7 +238,7 @@ namespace Tests { }); const bool mockDisk = (StaticNodes() + DynamicNodes()) == 1 && Settings->EnableMockOnSingleNode; - SetupTabletServices(*Runtime, &app, mockDisk, Settings->CustomDiskParams, Settings->CacheParams); + SetupTabletServices(*Runtime, &app, mockDisk, Settings->CustomDiskParams, Settings->CacheParams, Settings->EnableForceFollowers); // WARNING: must be careful about modifying app data after actor system starts diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h index 2c4def02ad..5cb51220fa 100644 --- a/ydb/core/testlib/test_client.h +++ b/ydb/core/testlib/test_client.h @@ -115,6 +115,7 @@ namespace Tests { TLoggerInitializer LoggerInitializer; TStoragePoolKinds StoragePoolTypes; TVector<NKikimrKqp::TKqpSetting> KqpSettings; + bool EnableForceFollowers = false; bool EnableConsole = true; bool EnableNodeBroker = false; bool EnableConfigsDispatcher = true; @@ -164,6 +165,7 @@ namespace Tests { TServerSettings& InitKikimrRunConfig() { KikimrRunConfig = std::make_shared<TKikimrRunConfig>(AppConfig); return *this; } TServerSettings& SetKeyFor(ui32 nodeId, TString keyValue) { NodeKeys[nodeId] = keyValue; return *this; } TServerSettings& SetEnableKqpSpilling(bool value) { EnableKqpSpilling = value; return *this; } + TServerSettings& SetEnableForceFollowers(bool value) { EnableForceFollowers = value; return *this; } TServerSettings& SetDomainPlanResolution(ui64 resolution) { DomainPlanResolution = resolution; return *this; } TServerSettings& SetFeatureFlags(const NKikimrConfig::TFeatureFlags& value) { FeatureFlags = value; return *this; } TServerSettings& SetCompactionConfig(const NKikimrConfig::TCompactionConfig& value) { CompactionConfig = value; return *this; } diff --git a/ydb/core/tx/datashard/datashard_ut_followers.cpp b/ydb/core/tx/datashard/datashard_ut_followers.cpp index ea21a77677..33a34d78a1 100644 --- a/ydb/core/tx/datashard/datashard_ut_followers.cpp +++ b/ydb/core/tx/datashard/datashard_ut_followers.cpp @@ -89,6 +89,78 @@ Y_UNIT_TEST_SUITE(DataShardFollowers) { } } + Y_UNIT_TEST(FollowerStaleRo) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableForceFollowers(true); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + CreateShardedTable(server, sender, "/Root", "table-1", + TShardedTableOptions() + .Shards(2) + .Followers(1)); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1), (2, 2), (3, 3);"); + + bool dropFollowerUpdates = true; + + { + auto result = KqpSimpleStaleRoExec(runtime, "SELECT * FROM `/Root/table-1`", "/Root"); + TString expected = "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 2 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 3 } }"; + UNIT_ASSERT_VALUES_EQUAL(result, expected); + } + + std::vector<TAutoPtr<IEventHandle>> capturedUpdates; + auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &ev) { + if (ev->GetTypeRewrite() == NKikimr::TEvTablet::TEvFollowerUpdate::EventType || + ev->GetTypeRewrite() == NKikimr::TEvTablet::TEvFollowerAuxUpdate::EventType || + ev->GetTypeRewrite() == NKikimr::TEvTablet::TEvFUpdate::EventType || + ev->GetTypeRewrite() == NKikimr::TEvTablet::TEvFAuxUpdate::EventType) + { + + if (dropFollowerUpdates) { + capturedUpdates.emplace_back(ev); + return true; + } + Cerr << "Followers update " << capturedUpdates.size() << Endl; + } + + return false; + }; + + // blocking followers from new log updates. + runtime.SetEventFilter(captureEvents); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 4), (2, 5), (3, 6);"); + + { + auto result = KqpSimpleStaleRoExec(runtime, "SELECT * FROM `/Root/table-1` where key = 1", "/Root"); + TString expected = "{ items { uint32_value: 1 } items { uint32_value: 1 } }"; + UNIT_ASSERT_VALUES_EQUAL(result, expected); + } + + { + // multiple shards, always read from main tablets. + auto result = KqpSimpleStaleRoExec(runtime, "SELECT * FROM `/Root/table-1`", "/Root"); + TString expected = "{ items { uint32_value: 1 } items { uint32_value: 4 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 5 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 6 } }"; + UNIT_ASSERT_VALUES_EQUAL(result, expected); + } + } + } // Y_UNIT_TEST_SUITE(DataShardFollowers) } // namespace NKikimr |