aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksei Borzenkov <snaury@gmail.com>2022-06-10 11:47:38 +0300
committerAleksei Borzenkov <snaury@gmail.com>2022-06-10 11:47:38 +0300
commit33ab194bb854888f80dcf4ce447d1beba2da8024 (patch)
tree6e2e7ceca86880c5e1fe1c7c80a372853a658861
parentd31f7882f3126d4b8838b19d406b0484d2faaad6 (diff)
downloadydb-33ab194bb854888f80dcf4ce447d1beba2da8024.tar.gz
Automatically transform read-only distributed transactions into snapshot reads, KIKIMR-15070
ref:1f4d75cf89014b1305c749b4f93cc29676fa0831
-rw-r--r--ydb/core/engine/mkql_engine_flat.cpp6
-rw-r--r--ydb/core/engine/mkql_engine_flat.h2
-rw-r--r--ydb/core/kqp/executer/kqp_data_executer.cpp91
-rw-r--r--ydb/core/kqp/ut/kqp_limits_ut.cpp6
-rw-r--r--ydb/core/tx/tx_proxy/datareq.cpp73
5 files changed, 166 insertions, 12 deletions
diff --git a/ydb/core/engine/mkql_engine_flat.cpp b/ydb/core/engine/mkql_engine_flat.cpp
index c7fe388eef..32e176a77a 100644
--- a/ydb/core/engine/mkql_engine_flat.cpp
+++ b/ydb/core/engine/mkql_engine_flat.cpp
@@ -380,7 +380,7 @@ public:
return DbKeys;
}
- EResult PrepareShardPrograms(const TShardLimits& limits) noexcept override {
+ EResult PrepareShardPrograms(const TShardLimits& limits, ui32* outRSCount) noexcept override {
Y_VERIFY(!AreAffectedShardsPrepared, "PrepareShardPrograms is already called");
TGuard<TScopedAlloc> allocGuard(Alloc);
AffectedShards.clear();
@@ -497,6 +497,10 @@ public:
return EResult::TooManyRS;
}
+ if (outRSCount) {
+ *outRSCount = readsets.size();
+ }
+
AreAffectedShardsPrepared = true;
return EResult::Ok;
}
diff --git a/ydb/core/engine/mkql_engine_flat.h b/ydb/core/engine/mkql_engine_flat.h
index 7bee67584b..71a9404ec9 100644
--- a/ydb/core/engine/mkql_engine_flat.h
+++ b/ydb/core/engine/mkql_engine_flat.h
@@ -200,7 +200,7 @@ public:
//-- proxy interface
virtual EResult SetProgram(TStringBuf program, TStringBuf params = TStringBuf()) noexcept = 0;
virtual TVector<THolder<TKeyDesc>>& GetDbKeys() noexcept = 0;
- virtual EResult PrepareShardPrograms(const TShardLimits& shardLimits = TShardLimits()) noexcept = 0;
+ virtual EResult PrepareShardPrograms(const TShardLimits& shardLimits = TShardLimits(), ui32* outRSCount = nullptr) noexcept = 0;
virtual ui32 GetAffectedShardCount() const noexcept = 0;
virtual EResult GetAffectedShard(ui32 index, TShardData& data) const noexcept = 0;
virtual void AfterShardProgramsExtracted() noexcept = 0;
diff --git a/ydb/core/kqp/executer/kqp_data_executer.cpp b/ydb/core/kqp/executer/kqp_data_executer.cpp
index 25a374d211..f5c08aad14 100644
--- a/ydb/core/kqp/executer/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_data_executer.cpp
@@ -18,6 +18,7 @@
#include <ydb/core/kqp/prepare/kqp_query_plan.h>
#include <ydb/core/tx/coordinator/coordinator_impl.h>
#include <ydb/core/tx/datashard/datashard.h>
+#include <ydb/core/tx/long_tx_service/public/events.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/library/yql/dq/runtime/dq_columns_resolve.h>
@@ -39,6 +40,7 @@ static constexpr ui32 ReplySizeLimit = 48 * 1024 * 1024; // 48 MB
class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Data> {
using TBase = TKqpExecuterBase<TKqpDataExecuter, EExecType::Data>;
+ using TKqpSnapshot = IKqpGateway::TKqpSnapshot;
struct TEvPrivate {
enum EEv {
@@ -1050,6 +1052,8 @@ private:
task.Meta.Writes->Ranges.MergeWritePoints(TShardKeyRanges(read.Ranges), keyTypes);
}
}
+
+ ShardsWithEffects.insert(task.Meta.ShardId);
}
} else {
auto result = (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kUpsertRows)
@@ -1069,6 +1073,8 @@ private:
} else {
task.Meta.Writes->Ranges.MergeWritePoints(std::move(*shardInfo.KeyWriteRanges), keyTypes);
}
+
+ ShardsWithEffects.insert(shardId);
}
}
break;
@@ -1191,14 +1197,14 @@ private:
<< ", locks: " << dataTransaction.GetKqpTransaction().GetLocks().ShortDebugString());
TEvDataShard::TEvProposeTransaction* ev;
- if (Request.Snapshot.IsValid() && ReadOnlyTx) {
+ if (Snapshot.IsValid() && ReadOnlyTx) {
ev = new TEvDataShard::TEvProposeTransaction(
NKikimrTxDataShard::TX_KIND_DATA,
SelfId(),
TxId,
dataTransaction.SerializeAsString(),
- Request.Snapshot.Step,
- Request.Snapshot.TxId,
+ Snapshot.Step,
+ Snapshot.TxId,
ImmediateTx ? NTxDataShard::TTxFlags::Immediate : 0);
} else {
ev = new TEvDataShard::TEvProposeTransaction(
@@ -1459,16 +1465,74 @@ private:
if (ReadOnlyTx && Request.Snapshot.IsValid()) {
// Snapshot reads are always immediate
+ Snapshot = Request.Snapshot;
ImmediateTx = true;
}
+ const bool forceSnapshot = (
+ ReadOnlyTx &&
+ !ImmediateTx &&
+ !HasPersistentChannels &&
+ !Database.empty() &&
+ AppData()->FeatureFlags.GetEnableMvccSnapshotReads());
+
+ if (forceSnapshot) {
+ ComputeTasks = std::move(computeTasks);
+ DatashardTxs = std::move(datashardTxs);
+
+ auto longTxService = NLongTxService::MakeLongTxServiceID(SelfId().NodeId());
+ Send(longTxService, new NLongTxService::TEvLongTxService::TEvAcquireReadSnapshot(Database));
+
+ LOG_T("Create temporary mvcc snapshot, ebcome WaitSnapshotState");
+ Become(&TKqpDataExecuter::WaitSnapshotState);
+ return;
+ }
+
+ ContinueExecute(computeTasks, datashardTxs);
+ }
+
+ STATEFN(WaitSnapshotState) {
+ try {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(NLongTxService::TEvLongTxService::TEvAcquireReadSnapshotResult, Handle);
+ hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution);
+ hFunc(TEvents::TEvWakeup, HandleTimeout);
+ default:
+ UnexpectedEvent("WaitSnapshotState", ev->GetTypeRewrite());
+ }
+ } catch (const yexception& e) {
+ InternalError(e.what());
+ }
+ ReportEventElapsedTime();
+ }
+
+ void Handle(NLongTxService::TEvLongTxService::TEvAcquireReadSnapshotResult::TPtr& ev) {
+ auto& record = ev->Get()->Record;
+
+ if (record.GetStatus() != Ydb::StatusIds::SUCCESS) {
+ ReplyErrorAndDie(record.GetStatus(), record.MutableIssues());
+ return;
+ }
+
+ Snapshot = TKqpSnapshot(record.GetSnapshotStep(), record.GetSnapshotTxId());
+ ImmediateTx = true;
+
+ auto computeTasks = std::move(ComputeTasks);
+ auto datashardTxs = std::move(DatashardTxs);
+ ContinueExecute(computeTasks, datashardTxs);
+ }
+
+ void ContinueExecute(
+ TVector<NDqProto::TDqTask>& computeTasks,
+ THashMap<ui64, NKikimrTxDataShard::TKqpTransaction>& datashardTxs)
+ {
UseFollowers = Request.IsolationLevel == NKikimrKqp::ISOLATION_LEVEL_READ_STALE;
if (datashardTxs.size() > 1) {
// Followers only allowed for single shard transactions.
// (legacy behaviour, for compatibility with current execution engine)
UseFollowers = false;
}
- if (Request.Snapshot.IsValid()) {
+ if (Snapshot.IsValid()) {
// TODO: KIKIMR-11912
UseFollowers = false;
}
@@ -1514,7 +1578,9 @@ private:
TSet<ui64> taskShardIds;
if (Request.ValidateLocks) {
for (auto& [shardId, _] : datashardTasks) {
- taskShardIds.insert(shardId);
+ if (ShardsWithEffects.contains(shardId)) {
+ taskShardIds.insert(shardId);
+ }
}
}
@@ -1715,6 +1781,10 @@ public:
channelDesc.SetIsPersistent(IsCrossShardChannel(TasksGraph, channel));
channelDesc.SetInMemory(channel.InMemory);
+
+ if (channelDesc.GetIsPersistent()) {
+ HasPersistentChannels = true;
+ }
}
private:
@@ -1763,6 +1833,17 @@ private:
TInstant FirstPrepareReply;
TInstant LastPrepareReply;
+
+ // Tracks which shards are expected to have effects
+ THashSet<ui64> ShardsWithEffects;
+ bool HasPersistentChannels = false;
+
+ // Either requested or temporarily acquired snapshot
+ TKqpSnapshot Snapshot;
+
+ // Temporary storage during snapshot acquisition
+ TVector<NDqProto::TDqTask> ComputeTasks;
+ THashMap<ui64, NKikimrTxDataShard::TKqpTransaction> DatashardTxs;
};
} // namespace
diff --git a/ydb/core/kqp/ut/kqp_limits_ut.cpp b/ydb/core/kqp/ut/kqp_limits_ut.cpp
index ab03391281..8f82287ed2 100644
--- a/ydb/core/kqp/ut/kqp_limits_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_limits_ut.cpp
@@ -316,7 +316,11 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
auto& queryLimits = *appConfig.MutableTableServiceConfig()->MutableQueryLimits();
queryLimits.MutablePhaseLimits()->SetTotalReadSizeLimitBytes(100'000'000);
- TKikimrRunner kikimr(appConfig);
+ auto serverSettings = TKikimrSettings()
+ .SetAppConfig(appConfig)
+ .SetEnableMvccSnapshotReads(false);
+
+ TKikimrRunner kikimr(serverSettings);
CreateLargeTable(kikimr, 20, 10, 1'000'000, 1);
auto db = kikimr.GetTableClient();
diff --git a/ydb/core/tx/tx_proxy/datareq.cpp b/ydb/core/tx/tx_proxy/datareq.cpp
index af48b42d60..9ce5729899 100644
--- a/ydb/core/tx/tx_proxy/datareq.cpp
+++ b/ydb/core/tx/tx_proxy/datareq.cpp
@@ -3,6 +3,7 @@
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/core/tx/datashard/datashard.h>
#include <ydb/core/tx/balance_coverage/balance_coverage_builder.h>
+#include <ydb/core/tx/long_tx_service/public/events.h>
#include <ydb/core/tx/tx_processing.h>
#include <ydb/core/actorlib_impl/long_timer.h>
@@ -308,6 +309,7 @@ private:
bool CanUseFollower;
bool StreamResponse;
+ TString DatabaseName;
TIntrusivePtr<TFlatMKQLRequest> FlatMKQLRequest;
TIntrusivePtr<TReadTableRequest> ReadTableRequest;
TString DatashardErrors;
@@ -335,6 +337,7 @@ private:
TInstant WallClockAccepted;
TInstant WallClockResolveStarted;
TInstant WallClockResolved;
+ TInstant WallClockAfterBuild;
TInstant WallClockPrepared;
TInstant WallClockPlanned;
@@ -358,6 +361,7 @@ private:
TAutoPtr<TEvTxProxySchemeCache::TEvResolveKeySet> PrepareFlatMKQLRequest(TStringBuf miniKQLProgram, TStringBuf miniKQLParams, const TActorContext &ctx);
void ProcessFlatMKQLResolve(NSchemeCache::TSchemeCacheRequest *cacheRequest, const TActorContext &ctx);
+ void ContinueFlatMKQLResolve(const TActorContext &ctx);
void ProcessReadTableResolve(NSchemeCache::TSchemeCacheRequest *cacheRequest, const TActorContext &ctx);
TIntrusivePtr<TTxProxyMon> TxProxyMon;
@@ -406,6 +410,7 @@ private:
void Handle(TEvTxProxyReq::TEvMakeRequest::TPtr &ev, const TActorContext &ctx);
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr &ev, const TActorContext &ctx);
void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr &ev, const TActorContext &ctx);
+ void Handle(NLongTxService::TEvLongTxService::TEvAcquireReadSnapshotResult::TPtr &ev, const TActorContext &ctx);
void Handle(TEvDataShard::TEvProposeTransactionRestart::TPtr &ev, const TActorContext &ctx);
void HandlePrepare(TEvDataShard::TEvProposeTransactionAttachResult::TPtr &ev, const TActorContext &ctx);
void HandlePrepare(TEvDataShard::TEvProposeTransactionResult::TPtr &ev, const TActorContext &ctx);
@@ -498,6 +503,17 @@ public:
}
}
+ STFUNC(StateWaitSnapshot) {
+ TRACE_EVENT(NKikimrServices::TX_PROXY);
+ switch (ev->GetTypeRewrite()) {
+ HFuncTraced(NLongTxService::TEvLongTxService::TEvAcquireReadSnapshotResult, Handle);
+ HFuncTraced(TEvTxProcessing::TEvStreamIsDead, Handle);
+ HFuncTraced(TEvents::TEvUndelivered, Handle);
+ CFunc(TEvents::TSystem::Wakeup, HandleExecTimeout);
+ CFunc(TEvPrivate::EvProxyDataReqOngoingTransactionsWatchdog, HandleWatchdog);
+ }
+ }
+
// resolve zombie state to keep shared key desc
STFUNC(StateResolveTimeout) {
TRACE_EVENT(NKikimrServices::TX_PROXY);
@@ -947,8 +963,10 @@ void TDataReq::ProcessFlatMKQLResolve(NSchemeCache::TSchemeCacheRequest *cacheRe
if (FlatMKQLRequest->Limits.GetReadsetCountLimit()) {
shardLimits.RSCount = std::min(shardLimits.RSCount, FlatMKQLRequest->Limits.GetReadsetCountLimit());
}
- FlatMKQLRequest->EngineResultStatusCode = engine.PrepareShardPrograms(shardLimits);
+ ui32 rsCount = 0;
+ FlatMKQLRequest->EngineResultStatusCode = engine.PrepareShardPrograms(shardLimits, &rsCount);
auto afterBuild = Now();
+ WallClockAfterBuild = afterBuild;
if (FlatMKQLRequest->EngineResultStatusCode != NMiniKQL::IEngineFlat::EResult::Ok) {
IssueManager.RaiseIssue(MakeIssue(NKikimrIssues::TIssuesIds::ENGINE_ERROR));
@@ -966,6 +984,53 @@ void TDataReq::ProcessFlatMKQLResolve(NSchemeCache::TSchemeCacheRequest *cacheRe
if (engine.GetAffectedShardCount() > 1 || FlatMKQLRequest->Snapshot) // TODO KIKIMR-11912
CanUseFollower = false;
+ // Check if we want to use snapshot even when caller didn't provide one
+ const bool forceSnapshot = (
+ FlatMKQLRequest->ReadOnlyProgram &&
+ !FlatMKQLRequest->Snapshot &&
+ rsCount == 0 &&
+ engine.GetAffectedShardCount() > 1 &&
+ ((TxFlags & NTxDataShard::TTxFlags::ForceOnline) == 0) &&
+ AppData(ctx)->FeatureFlags.GetEnableMvccSnapshotReads() &&
+ !DatabaseName.empty());
+
+ if (forceSnapshot) {
+ Send(NLongTxService::MakeLongTxServiceID(ctx.SelfID.NodeId()),
+ new NLongTxService::TEvLongTxService::TEvAcquireReadSnapshot(DatabaseName));
+ Become(&TThis::StateWaitSnapshot);
+ return;
+ }
+
+ ContinueFlatMKQLResolve(ctx);
+}
+
+void TDataReq::Handle(NLongTxService::TEvLongTxService::TEvAcquireReadSnapshotResult::TPtr &ev, const TActorContext &ctx) {
+ const auto& record = ev->Get()->Record;
+
+ if (record.GetStatus() != Ydb::StatusIds::SUCCESS) {
+ NYql::TIssues issues;
+ NYql::IssuesFromMessage(record.GetIssues(), issues);
+ IssueManager.RaiseIssues(issues);
+ ReportStatus(
+ TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ResolveError,
+ NKikimrIssues::TStatusIds::ERROR,
+ true, ctx);
+ Die(ctx);
+ return;
+ }
+
+ // Update timestamp: snapshot creation should not be included in send time histogram
+ WallClockAfterBuild = Now();
+
+ Y_VERIFY(FlatMKQLRequest);
+ FlatMKQLRequest->Snapshot = TRowVersion(record.GetSnapshotStep(), record.GetSnapshotTxId());
+ ContinueFlatMKQLResolve(ctx);
+}
+
+void TDataReq::ContinueFlatMKQLResolve(const TActorContext &ctx) {
+ NMiniKQL::IEngineFlat &engine = *FlatMKQLRequest->Engine;
+ auto &keyDescriptions = engine.GetDbKeys();
+
TDuration shardCancelAfter = ExecTimeoutPeriod;
if (CancelAfter) {
shardCancelAfter = Min(shardCancelAfter, CancelAfter);
@@ -1062,7 +1127,7 @@ void TDataReq::ProcessFlatMKQLResolve(NSchemeCache::TSchemeCacheRequest *cacheRe
}
engine.AfterShardProgramsExtracted();
- TxProxyMon->TxPrepareSendShardProgramsHgram->Collect((Now() - afterBuild).MicroSeconds());
+ TxProxyMon->TxPrepareSendShardProgramsHgram->Collect((Now() - WallClockAfterBuild).MicroSeconds());
Become(&TThis::StateWaitPrepare);
}
@@ -1257,7 +1322,7 @@ void TDataReq::Handle(TEvTxProxyReq::TEvMakeRequest::TPtr &ev, const TActorConte
if (txbody.HasReadTableTransaction()) {
ReadTableRequest = new TReadTableRequest(txbody.GetReadTableTransaction());
TAutoPtr<NSchemeCache::TSchemeCacheNavigate> request(new NSchemeCache::TSchemeCacheNavigate());
- request->DatabaseName = record.GetDatabaseName();
+ request->DatabaseName = DatabaseName = record.GetDatabaseName();
NSchemeCache::TSchemeCacheNavigate::TEntry entry;
entry.Path = SplitPath(ReadTableRequest->TablePath);
@@ -1365,7 +1430,7 @@ void TDataReq::Handle(TEvTxProxyReq::TEvMakeRequest::TPtr &ev, const TActorConte
NKikimrIssues::TStatusIds::TRANSIENT, false, ctx);
}
- resolveReq->Request->DatabaseName = record.GetDatabaseName();
+ resolveReq->Request->DatabaseName = DatabaseName = record.GetDatabaseName();
TxProxyMon->MakeRequestProxyAccepted->Inc();
LOG_DEBUG_S_SAMPLED_BY(ctx, NKikimrServices::TX_PROXY, TxId,
"Actor# " << ctx.SelfID.ToString() << " txid# " << TxId