aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2023-06-10 01:01:00 +0300
committergvit <gvit@ydb.tech>2023-06-10 01:01:00 +0300
commit91497eb27263e2feb35b53a90773e7207752a2ec (patch)
tree1ca8afb808fe2bfeece5b0c103cfb849020dfd03
parent41e40901a93e04955628042e75367b2662a81542 (diff)
downloadydb-91497eb27263e2feb35b53a90773e7207752a2ec.tar.gz
fix followers
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp82
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp3
-rw-r--r--ydb/core/kqp/executer_actor/kqp_shards_resolver.cpp10
-rw-r--r--ydb/core/kqp/executer_actor/kqp_shards_resolver.h2
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp5
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.h1
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp41
-rw-r--r--ydb/core/protos/tx_datashard.proto1
-rw-r--r--ydb/core/testlib/basics/helpers.h4
-rw-r--r--ydb/core/testlib/basics/services.cpp7
-rw-r--r--ydb/core/testlib/tablet_helpers.cpp4
-rw-r--r--ydb/core/testlib/tablet_helpers.h2
-rw-r--r--ydb/core/testlib/test_client.cpp2
-rw-r--r--ydb/core/testlib/test_client.h2
-rw-r--r--ydb/core/tx/datashard/datashard_ut_followers.cpp72
15 files changed, 176 insertions, 62 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index ead0bfda37..7c6936c0f7 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -177,6 +177,31 @@ public:
}
}
+ bool ForceAcquireSnapshot() const {
+ const bool forceSnapshot = (
+ ReadOnlyTx &&
+ !ImmediateTx &&
+ !HasPersistentChannels &&
+ (!Database.empty() || AppData()->EnableMvccSnapshotWithLegacyDomainRoot) &&
+ AppData()->FeatureFlags.GetEnableMvccSnapshotReads()
+ );
+
+ return forceSnapshot;
+ }
+
+ bool GetUseFollowers() const {
+ return (
+ // first, we must specify read stale flag.
+ Request.IsolationLevel == NKikimrKqp::ISOLATION_LEVEL_READ_STALE &&
+ // next, if snapshot is already defined, so in this case followers are not allowed.
+ !GetSnapshot().IsValid() &&
+ // ensure that followers are allowed only for read only transactions.
+ ReadOnlyTx &&
+ // if we are forced to acquire snapshot by some reason, so we cannot use followers.
+ !ForceAcquireSnapshot()
+ );
+ }
+
void Finalize() {
if (LocksBroken) {
TString message = "Transaction locks invalidated.";
@@ -1131,7 +1156,7 @@ private:
LOG_I("Reattach to shard " << tabletId);
- Send(MakePipePeNodeCacheID(UseFollowers), new TEvPipeCache::TEvForward(
+ Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(
new TEvDataShard::TEvProposeTransactionAttach(tabletId, TxId),
tabletId, /* subscribe */ true), 0, ++shardState->ReattachState.Cookie);
}
@@ -1502,7 +1527,7 @@ private:
TShardState shardState;
shardState.State = ImmediateTx ? TShardState::EState::Executing : TShardState::EState::Preparing;
shardState.DatashardState.ConstructInPlace();
- shardState.DatashardState->Follower = UseFollowers;
+ shardState.DatashardState->Follower = GetUseFollowers();
if (Deadline) {
TDuration timeout = *Deadline - TAppData::TimeProvider->Now();
@@ -1582,7 +1607,7 @@ private:
LOG_D("ExecuteDatashardTransaction traceId.verbosity: " << std::to_string(traceId.GetVerbosity()));
- Send(MakePipePeNodeCacheID(UseFollowers), new TEvPipeCache::TEvForward(ev.release(), shardId, true), 0, 0, std::move(traceId));
+ Send(MakePipePeNodeCacheID(GetUseFollowers()), new TEvPipeCache::TEvForward(ev.release(), shardId, true), 0, 0, std::move(traceId));
auto result = ShardStates.emplace(shardId, std::move(shardState));
YQL_ENSURE(result.second);
@@ -1777,17 +1802,9 @@ private:
// Single-shard transactions are always immediate
ImmediateTx = (datashardTxs.size() + Request.TopicOperations.GetSize() + readActors) <= 1 && !UnknownAffectedShardCount;
- if (ImmediateTx) {
- // Transaction cannot be both immediate and volatile
- YQL_ENSURE(!VolatileTx);
- }
-
switch (Request.IsolationLevel) {
// OnlineRO with AllowInconsistentReads = true
case NKikimrKqp::ISOLATION_LEVEL_READ_UNCOMMITTED:
- // StaleRO transactions always execute as immediate
- // (legacy behavior, for compatibility with current execution engine)
- case NKikimrKqp::ISOLATION_LEVEL_READ_STALE:
YQL_ENSURE(ReadOnlyTx);
YQL_ENSURE(!VolatileTx);
ImmediateTx = true;
@@ -1797,6 +1814,11 @@ private:
break;
}
+ if (ImmediateTx) {
+ // Transaction cannot be both immediate and volatile
+ YQL_ENSURE(!VolatileTx);
+ }
+
if ((ReadOnlyTx || Request.UseImmediateEffects) && GetSnapshot().IsValid()) {
// Snapshot reads are always immediate
// Uncommitted writes are executed without coordinators, so they can be immediate
@@ -1813,13 +1835,16 @@ private:
prepareTasksSpan.End();
}
+ TasksGraph.GetMeta().UseFollowers = GetUseFollowers();
+
if (RemoteComputeTasks) {
TSet<ui64> shardIds;
for (const auto& [shardId, _] : RemoteComputeTasks) {
shardIds.insert(shardId);
}
- auto kqpShardsResolver = CreateKqpShardsResolver(SelfId(), TxId, std::move(shardIds));
+ auto kqpShardsResolver = CreateKqpShardsResolver(
+ SelfId(), TxId, TasksGraph.GetMeta().UseFollowers, std::move(shardIds));
RegisterWithSameMailbox(kqpShardsResolver);
Become(&TKqpDataExecuter::WaitResolveState);
} else {
@@ -1838,14 +1863,7 @@ private:
}
void OnShardsResolve() {
- const bool forceSnapshot = (
- ReadOnlyTx &&
- !ImmediateTx &&
- !HasPersistentChannels &&
- (!Database.empty() || AppData()->EnableMvccSnapshotWithLegacyDomainRoot) &&
- AppData()->FeatureFlags.GetEnableMvccSnapshotReads());
-
- if (forceSnapshot) {
+ if (ForceAcquireSnapshot()) {
YQL_ENSURE(!VolatileTx);
auto longTxService = NLongTxService::MakeLongTxServiceID(SelfId().NodeId());
Send(longTxService, new NLongTxService::TEvLongTxService::TEvAcquireReadSnapshot(Database));
@@ -1896,21 +1914,6 @@ private:
using TTopicTabletTxs = THashMap<ui64, NKikimrPQ::TDataTransaction>;
void ContinueExecute() {
- UseFollowers = Request.IsolationLevel == NKikimrKqp::ISOLATION_LEVEL_READ_STALE;
-
- if (!ImmediateTx) {
- // Followers only allowed for single shard transactions.
- // (legacy behaviour, for compatibility with current execution engine)
- UseFollowers = false;
- }
- if (GetSnapshot().IsValid()) {
- // TODO: KIKIMR-11912
- UseFollowers = false;
- }
- if (UseFollowers) {
- YQL_ENSURE(ReadOnlyTx);
- }
-
if (Stats) {
//Stats->AffectedShards = datashardTxs.size();
Stats->DatashardStats.reserve(DatashardTxs.size());
@@ -2204,7 +2207,7 @@ private:
<< ", volatile: " << VolatileTx
<< ", immediate: " << ImmediateTx
<< ", pending compute tasks" << PendingComputeTasks.size()
- << ", useFollowers: " << UseFollowers);
+ << ", useFollowers: " << GetUseFollowers());
LOG_T("Updating channels after the creation of compute actors");
THashMap<TActorId, THashSet<ui64>> updates;
@@ -2245,7 +2248,7 @@ private:
LOG_D("Executing KQP transaction on topic tablet: " << tabletId
<< ", lockTxId: " << lockTxId);
- Send(MakePipePeNodeCacheID(UseFollowers),
+ Send(MakePipePeNodeCacheID(false),
new TEvPipeCache::TEvForward(ev.release(), tabletId, true),
0,
0,
@@ -2255,7 +2258,7 @@ private:
state.State =
ImmediateTx ? TShardState::EState::Executing : TShardState::EState::Preparing;
state.DatashardState.ConstructInPlace();
- state.DatashardState->Follower = UseFollowers;
+ state.DatashardState->Follower = false;
state.DatashardState->ShardReadLocks = Request.TopicOperations.TabletHasReadOperations(tabletId);
@@ -2274,7 +2277,7 @@ private:
Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(0));
- if (UseFollowers) {
+ if (GetUseFollowers()) {
Send(MakePipePeNodeCacheID(true), new TEvPipeCache::TEvUnlink(0));
}
@@ -2339,7 +2342,6 @@ private:
bool ReadOnlyTx = true;
bool VolatileTx = false;
bool ImmediateTx = false;
- bool UseFollowers = false;
bool TxPlanned = false;
bool LocksBroken = false;
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index 711e42f7f6..4aebc7337f 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -490,7 +490,8 @@ private:
}
if (shardIds) {
LOG_D("Start resolving tablets nodes... (" << shardIds.size() << ")");
- auto kqpShardsResolver = CreateKqpShardsResolver(this->SelfId(), TxId, std::move(shardIds));
+ auto kqpShardsResolver = CreateKqpShardsResolver(
+ this->SelfId(), TxId, false, std::move(shardIds));
KqpShardsResolverId = this->RegisterWithSameMailbox(kqpShardsResolver);
} else {
GetResourcesSnapshot();
diff --git a/ydb/core/kqp/executer_actor/kqp_shards_resolver.cpp b/ydb/core/kqp/executer_actor/kqp_shards_resolver.cpp
index cbc586e4ad..cf9ac91382 100644
--- a/ydb/core/kqp/executer_actor/kqp_shards_resolver.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_shards_resolver.cpp
@@ -36,11 +36,12 @@ public:
}
public:
- TKqpShardsResolver(const TActorId& owner, ui64 txId, TSet<ui64>&& shardIds)
+ TKqpShardsResolver(const TActorId& owner, ui64 txId, bool useFollowers, TSet<ui64>&& shardIds)
: Owner(owner)
, TxId(txId)
, ShardIds(std::move(shardIds))
- , TabletResolver(MakePipePeNodeCacheID(false))
+ , UseFollowers(useFollowers)
+ , TabletResolver(MakePipePeNodeCacheID(UseFollowers))
{}
void Bootstrap() {
@@ -112,6 +113,7 @@ private:
const TActorId Owner;
const ui64 TxId;
const TSet<ui64> ShardIds;
+ const bool UseFollowers;
const TActorId TabletResolver;
TMap<ui64, ui32> RetryCount;
TMap<ui64, ui64> Result;
@@ -119,8 +121,8 @@ private:
} // anonymous namespace
-IActor* CreateKqpShardsResolver(const TActorId& owner, ui64 txId, TSet<ui64>&& shardIds) {
- return new TKqpShardsResolver(owner, txId, std::move(shardIds));
+IActor* CreateKqpShardsResolver(const TActorId& owner, ui64 txId, bool useFollowers, TSet<ui64>&& shardIds) {
+ return new TKqpShardsResolver(owner, txId, useFollowers, std::move(shardIds));
}
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/executer_actor/kqp_shards_resolver.h b/ydb/core/kqp/executer_actor/kqp_shards_resolver.h
index 720a6086ab..cb0df8421a 100644
--- a/ydb/core/kqp/executer_actor/kqp_shards_resolver.h
+++ b/ydb/core/kqp/executer_actor/kqp_shards_resolver.h
@@ -5,6 +5,6 @@
namespace NKikimr::NKqp {
-NActors::IActor* CreateKqpShardsResolver(const NActors::TActorId& owner, ui64 txId, TSet<ui64>&& shardIds);
+NActors::IActor* CreateKqpShardsResolver(const NActors::TActorId& owner, ui64 txId, bool useFollowers, TSet<ui64>&& shardIds);
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
index 329238bc78..448dfb3953 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
@@ -670,11 +670,16 @@ void AddSnapshotInfoToTaskInputs(const TKqpTasksGraph& tasksGraph, NYql::NDqProt
NKikimrTxDataShard::TKqpReadRangesSourceSettings settings;
YQL_ENSURE(settingsAny.UnpackTo(&settings), "Failed to unpack settings");
+
if (snapshot.IsValid()) {
settings.MutableSnapshot()->SetStep(snapshot.Step);
settings.MutableSnapshot()->SetTxId(snapshot.TxId);
}
+ if (tasksGraph.GetMeta().UseFollowers) {
+ settings.SetUseFollowers(tasksGraph.GetMeta().UseFollowers);
+ }
+
source->MutableSettings()->PackFrom(settings);
}
}
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
index 911a7da01c..f86873dd7c 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
@@ -89,6 +89,7 @@ struct TGraphMeta {
IKqpGateway::TKqpSnapshot Snapshot;
std::unordered_map<ui64, TActorId> ResultChannelProxies;
TActorId ExecuterId;
+ bool UseFollowers = false;
void SetSnapshot(ui64 step, ui64 txId) {
Snapshot = IKqpGateway::TKqpSnapshot(step, txId);
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index 617b458803..065e17f013 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -61,7 +61,8 @@ THolder<NKikimr::TEvDataShard::TEvReadAck> DefaultAckSettings() {
return result;
}
-NActors::TActorId PipeCacheId = NKikimr::MakePipePeNodeCacheID(false);
+NActors::TActorId MainPipeCacheId = NKikimr::MakePipePeNodeCacheID(false);
+NActors::TActorId FollowersPipeCacheId = NKikimr::MakePipePeNodeCacheID(true);
TDuration StartRetryDelay = TDuration::MilliSeconds(250);
@@ -104,6 +105,7 @@ public:
size_t ResolveAttempt = 0;
size_t RetryAttempt = 0;
+ size_t SuccessBatches = 0;
TShardState(ui64 tabletId)
: TabletId(tabletId)
@@ -360,6 +362,8 @@ public:
, HolderFactory(args.HolderFactory)
, Alloc(args.Alloc)
, Counters(counters)
+ , UseFollowers(false)
+ , PipeCacheId(MainPipeCacheId)
{
TableId = TTableId(
Settings.GetTable().GetTableId().GetOwnerId(),
@@ -368,6 +372,13 @@ public:
Settings.GetTable().GetTableId().GetSchemaVersion()
);
+ if (Settings.GetUseFollowers() && !Snapshot.IsValid()) {
+ // reading from followers is allowed only of snapshot is not specified and
+ // specific flag is set. otherwise we always read from main replicas.
+ PipeCacheId = FollowersPipeCacheId;
+ UseFollowers = true;
+ }
+
InitResultColumns();
KeyColumnTypes.reserve(Settings.GetKeyColumnTypes().size());
@@ -830,7 +841,7 @@ public:
Counters->CreatedIterators->Inc();
ReadIdByTabletId[state->TabletId].push_back(id);
- Send(::PipeCacheId, new TEvPipeCache::TEvForward(ev.Release(), state->TabletId, true),
+ Send(PipeCacheId, new TEvPipeCache::TEvForward(ev.Release(), state->TabletId, true),
IEventHandle::FlagTrackDelivery);
}
@@ -868,9 +879,20 @@ public:
CA_LOG_D("read id #" << id << " got issue " << issue.Getmessage());
Reads[id].Shard->Issues.push_back(issue);
}
+
+ if (UseFollowers && record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS && Reads[id].Shard->SuccessBatches > 0) {
+ // read from follower is interrupted with error after several successful responses.
+ // in this case read is not safe because we can return inconsistent data.
+ NYql::TIssues issues;
+ NYql::IssuesFromMessage(record.GetStatus().GetIssues(), issues);
+ return RuntimeError("Failed to read from follower", NYql::NDqProto::StatusIds::UNAVAILABLE, issues);
+ }
+
switch (record.GetStatus().GetCode()) {
- case Ydb::StatusIds::SUCCESS:
+ case Ydb::StatusIds::SUCCESS: {
+ Reads[id].Shard->SuccessBatches++;
break;
+ }
case Ydb::StatusIds::OVERLOADED: {
return RetryRead(id, false);
}
@@ -940,7 +962,7 @@ public:
auto* state = Reads[id].Shard;
auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>();
cancel->Record.SetReadId(id);
- Send(::PipeCacheId, new TEvPipeCache::TEvForward(cancel.Release(), state->TabletId, false));
+ Send(PipeCacheId, new TEvPipeCache::TEvForward(cancel.Release(), state->TabletId, false));
Reads[id].Reset();
ResetReads++;
@@ -1152,7 +1174,7 @@ public:
}
Counters->SentIteratorAcks->Inc();
CA_LOG_D("sending ack for read #" << id << " limit " << limit << " seqno = " << record.GetSeqNo());
- Send(::PipeCacheId, new TEvPipeCache::TEvForward(request.Release(), Reads[id].Shard->TabletId, true),
+ Send(PipeCacheId, new TEvPipeCache::TEvForward(request.Release(), Reads[id].Shard->TabletId, true),
IEventHandle::FlagTrackDelivery);
} else {
Reads[id].Finished = true;
@@ -1235,7 +1257,10 @@ public:
for (size_t i = 0; i < Reads.size(); ++i) {
ResetRead(i);
}
- Send(PipeCacheId, new TEvPipeCache::TEvUnlink(0));
+ Send(::MainPipeCacheId, new TEvPipeCache::TEvUnlink(0));
+ if (UseFollowers) {
+ Send(::FollowersPipeCacheId, new TEvPipeCache::TEvUnlink(0));
+ }
}
TBase::PassAway();
}
@@ -1334,6 +1359,8 @@ private:
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
TIntrusivePtr<TKqpCounters> Counters;
+ bool UseFollowers;
+ NActors::TActorId PipeCacheId;
};
@@ -1355,7 +1382,7 @@ void InjectRangeEvReadAckSettings(const NKikimrTxDataShard::TEvReadAck& ack) {
}
void InterceptReadActorPipeCache(NActors::TActorId id) {
- ::PipeCacheId = id;
+ ::MainPipeCacheId = id;
}
} // namespace NKqp
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index 9efbc6adc4..7b42f2f32f 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -275,6 +275,7 @@ message TKqpReadRangesSourceSettings {
optional uint32 LockNodeId = 14;
optional uint64 MaxInFlightShards = 16;
+ optional bool UseFollowers = 17 [default = false];
}
message TKqpTaskInfo {
diff --git a/ydb/core/testlib/basics/helpers.h b/ydb/core/testlib/basics/helpers.h
index 9658b83714..96c9197e61 100644
--- a/ydb/core/testlib/basics/helpers.h
+++ b/ydb/core/testlib/basics/helpers.h
@@ -44,7 +44,7 @@ namespace NFake {
ui32 nrings, ui32 ringSize, ui64 stateStorageGroup);
void SetupBSNodeWarden(TTestActorRuntime& runtime, ui32 nodeIndex, TIntrusivePtr<TNodeWardenConfig> nodeWardenConfig);
void SetupTabletResolver(TTestActorRuntime& runtime, ui32 nodeIndex);
- void SetupTabletPipePeNodeCaches(TTestActorRuntime& runtime, ui32 nodeIndex);
+ void SetupTabletPipePeNodeCaches(TTestActorRuntime& runtime, ui32 nodeIndex, bool forceFollowers = false);
void SetupResourceBroker(TTestActorRuntime& runtime, ui32 nodeIndex);
void SetupSharedPageCache(TTestActorRuntime& runtime, ui32 nodeIndex, NFake::TCaches caches);
void SetupNodeWhiteboard(TTestActorRuntime& runtime, ui32 nodeIndex);
@@ -57,7 +57,7 @@ namespace NFake {
// StateStorage, NodeWarden, TabletResolver, ResourceBroker, SharedPageCache
void SetupBasicServices(TTestActorRuntime &runtime, TAppPrepare &app, bool mockDisk = false,
- NFake::INode *factory = nullptr, NFake::TStorage storage = {}, NFake::TCaches caches = {});
+ NFake::INode *factory = nullptr, NFake::TStorage storage = {}, NFake::TCaches caches = {}, bool forceFollowers = false);
///
class TStrandedPDiskServiceFactory : public IPDiskServiceFactory {
diff --git a/ydb/core/testlib/basics/services.cpp b/ydb/core/testlib/basics/services.cpp
index 9ffba79e19..7cbff912a7 100644
--- a/ydb/core/testlib/basics/services.cpp
+++ b/ydb/core/testlib/basics/services.cpp
@@ -66,7 +66,7 @@ namespace NPDisk {
TActorSetupCmd(tabletResolver, TMailboxType::Revolving, 0), nodeIndex);
}
- void SetupTabletPipePeNodeCaches(TTestActorRuntime& runtime, ui32 nodeIndex)
+ void SetupTabletPipePeNodeCaches(TTestActorRuntime& runtime, ui32 nodeIndex, bool forceFollowers)
{
TIntrusivePtr<TPipePeNodeCacheConfig> leaderPipeConfig = new TPipePeNodeCacheConfig();
leaderPipeConfig->PipeRefreshTime = TDuration::Zero();
@@ -76,6 +76,7 @@ namespace NPDisk {
followerPipeConfig->PipeRefreshTime = TDuration::Seconds(30);
followerPipeConfig->PipeConfig.AllowFollower = true;
followerPipeConfig->PipeConfig.RetryPolicy = {.RetryLimitCount = 3};
+ followerPipeConfig->PipeConfig.ForceFollower = forceFollowers;
runtime.AddLocalService(MakePipePeNodeCacheID(false),
TActorSetupCmd(CreatePipePeNodeCache(leaderPipeConfig), TMailboxType::Revolving, 0), nodeIndex);
@@ -310,7 +311,7 @@ namespace NPDisk {
}
void SetupBasicServices(TTestActorRuntime& runtime, TAppPrepare& app, bool mock,
- NFake::INode* factory, NFake::TStorage storage, NFake::TCaches caches)
+ NFake::INode* factory, NFake::TStorage storage, NFake::TCaches caches, bool forceFollowers)
{
runtime.SetDispatchTimeout(storage.UseDisk ? DISK_DISPATCH_TIMEOUT : DEFAULT_DISPATCH_TIMEOUT);
@@ -336,7 +337,7 @@ namespace NPDisk {
SetupBSNodeWarden(runtime, nodeIndex, disk.MakeWardenConf(*app.Domains, keyConfig));
SetupTabletResolver(runtime, nodeIndex);
- SetupTabletPipePeNodeCaches(runtime, nodeIndex);
+ SetupTabletPipePeNodeCaches(runtime, nodeIndex, forceFollowers);
SetupResourceBroker(runtime, nodeIndex);
SetupSharedPageCache(runtime, nodeIndex, caches);
SetupBlobCache(runtime, nodeIndex);
diff --git a/ydb/core/testlib/tablet_helpers.cpp b/ydb/core/testlib/tablet_helpers.cpp
index d52aac91e2..76e0aed50d 100644
--- a/ydb/core/testlib/tablet_helpers.cpp
+++ b/ydb/core/testlib/tablet_helpers.cpp
@@ -618,13 +618,13 @@ namespace NKikimr {
}
void SetupTabletServices(TTestActorRuntime &runtime, TAppPrepare *app, bool mockDisk, NFake::TStorage storage,
- NFake::TCaches caches) {
+ NFake::TCaches caches, bool forceFollowers) {
TAutoPtr<TAppPrepare> dummy;
if (app == nullptr) {
dummy = app = new TAppPrepare;
}
TUltimateNodes nodes(runtime, app);
- SetupBasicServices(runtime, *app, mockDisk, &nodes, storage, caches);
+ SetupBasicServices(runtime, *app, mockDisk, &nodes, storage, caches, forceFollowers);
}
TDomainsInfo::TDomain::TStoragePoolKinds DefaultPoolKinds(ui32 count) {
diff --git a/ydb/core/testlib/tablet_helpers.h b/ydb/core/testlib/tablet_helpers.h
index 81a7c614aa..9e3616148e 100644
--- a/ydb/core/testlib/tablet_helpers.h
+++ b/ydb/core/testlib/tablet_helpers.h
@@ -26,7 +26,7 @@ namespace NKikimr {
void RebootTablet(TTestActorRuntime& runtime, ui64 tabletId, const TActorId& sender, ui32 nodeIndex = 0, bool sysTablet = false);
void GracefulRestartTablet(TTestActorRuntime& runtime, ui64 tabletId, const TActorId& sender, ui32 nodeIndex = 0);
void SetupTabletServices(TTestActorRuntime& runtime, TAppPrepare* app = nullptr, bool mockDisk = false,
- NFake::TStorage storage = {}, NFake::TCaches caches = {});
+ NFake::TStorage storage = {}, NFake::TCaches caches = {}, bool forceFollowers = false);
const TString DEFAULT_STORAGE_POOL = "Storage Pool with id: 1";
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index 046413257e..ce8e8a66c1 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -238,7 +238,7 @@ namespace Tests {
});
const bool mockDisk = (StaticNodes() + DynamicNodes()) == 1 && Settings->EnableMockOnSingleNode;
- SetupTabletServices(*Runtime, &app, mockDisk, Settings->CustomDiskParams, Settings->CacheParams);
+ SetupTabletServices(*Runtime, &app, mockDisk, Settings->CustomDiskParams, Settings->CacheParams, Settings->EnableForceFollowers);
// WARNING: must be careful about modifying app data after actor system starts
diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h
index 2c4def02ad..5cb51220fa 100644
--- a/ydb/core/testlib/test_client.h
+++ b/ydb/core/testlib/test_client.h
@@ -115,6 +115,7 @@ namespace Tests {
TLoggerInitializer LoggerInitializer;
TStoragePoolKinds StoragePoolTypes;
TVector<NKikimrKqp::TKqpSetting> KqpSettings;
+ bool EnableForceFollowers = false;
bool EnableConsole = true;
bool EnableNodeBroker = false;
bool EnableConfigsDispatcher = true;
@@ -164,6 +165,7 @@ namespace Tests {
TServerSettings& InitKikimrRunConfig() { KikimrRunConfig = std::make_shared<TKikimrRunConfig>(AppConfig); return *this; }
TServerSettings& SetKeyFor(ui32 nodeId, TString keyValue) { NodeKeys[nodeId] = keyValue; return *this; }
TServerSettings& SetEnableKqpSpilling(bool value) { EnableKqpSpilling = value; return *this; }
+ TServerSettings& SetEnableForceFollowers(bool value) { EnableForceFollowers = value; return *this; }
TServerSettings& SetDomainPlanResolution(ui64 resolution) { DomainPlanResolution = resolution; return *this; }
TServerSettings& SetFeatureFlags(const NKikimrConfig::TFeatureFlags& value) { FeatureFlags = value; return *this; }
TServerSettings& SetCompactionConfig(const NKikimrConfig::TCompactionConfig& value) { CompactionConfig = value; return *this; }
diff --git a/ydb/core/tx/datashard/datashard_ut_followers.cpp b/ydb/core/tx/datashard/datashard_ut_followers.cpp
index ea21a77677..33a34d78a1 100644
--- a/ydb/core/tx/datashard/datashard_ut_followers.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_followers.cpp
@@ -89,6 +89,78 @@ Y_UNIT_TEST_SUITE(DataShardFollowers) {
}
}
+ Y_UNIT_TEST(FollowerStaleRo) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableForceFollowers(true);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ CreateShardedTable(server, sender, "/Root", "table-1",
+ TShardedTableOptions()
+ .Shards(2)
+ .Followers(1));
+
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1), (2, 2), (3, 3);");
+
+ bool dropFollowerUpdates = true;
+
+ {
+ auto result = KqpSimpleStaleRoExec(runtime, "SELECT * FROM `/Root/table-1`", "/Root");
+ TString expected = "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { uint32_value: 2 } items { uint32_value: 2 } }, "
+ "{ items { uint32_value: 3 } items { uint32_value: 3 } }";
+ UNIT_ASSERT_VALUES_EQUAL(result, expected);
+ }
+
+ std::vector<TAutoPtr<IEventHandle>> capturedUpdates;
+ auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &ev) {
+ if (ev->GetTypeRewrite() == NKikimr::TEvTablet::TEvFollowerUpdate::EventType ||
+ ev->GetTypeRewrite() == NKikimr::TEvTablet::TEvFollowerAuxUpdate::EventType ||
+ ev->GetTypeRewrite() == NKikimr::TEvTablet::TEvFUpdate::EventType ||
+ ev->GetTypeRewrite() == NKikimr::TEvTablet::TEvFAuxUpdate::EventType)
+ {
+
+ if (dropFollowerUpdates) {
+ capturedUpdates.emplace_back(ev);
+ return true;
+ }
+ Cerr << "Followers update " << capturedUpdates.size() << Endl;
+ }
+
+ return false;
+ };
+
+ // blocking followers from new log updates.
+ runtime.SetEventFilter(captureEvents);
+
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 4), (2, 5), (3, 6);");
+
+ {
+ auto result = KqpSimpleStaleRoExec(runtime, "SELECT * FROM `/Root/table-1` where key = 1", "/Root");
+ TString expected = "{ items { uint32_value: 1 } items { uint32_value: 1 } }";
+ UNIT_ASSERT_VALUES_EQUAL(result, expected);
+ }
+
+ {
+ // multiple shards, always read from main tablets.
+ auto result = KqpSimpleStaleRoExec(runtime, "SELECT * FROM `/Root/table-1`", "/Root");
+ TString expected = "{ items { uint32_value: 1 } items { uint32_value: 4 } }, "
+ "{ items { uint32_value: 2 } items { uint32_value: 5 } }, "
+ "{ items { uint32_value: 3 } items { uint32_value: 6 } }";
+ UNIT_ASSERT_VALUES_EQUAL(result, expected);
+ }
+ }
+
} // Y_UNIT_TEST_SUITE(DataShardFollowers)
} // namespace NKikimr