diff options
author | Aleksei Borzenkov <snaury@gmail.com> | 2022-06-10 11:47:38 +0300 |
---|---|---|
committer | Aleksei Borzenkov <snaury@gmail.com> | 2022-06-10 11:47:38 +0300 |
commit | 33ab194bb854888f80dcf4ce447d1beba2da8024 (patch) | |
tree | 6e2e7ceca86880c5e1fe1c7c80a372853a658861 | |
parent | d31f7882f3126d4b8838b19d406b0484d2faaad6 (diff) | |
download | ydb-33ab194bb854888f80dcf4ce447d1beba2da8024.tar.gz |
Automatically transform read-only distributed transactions into snapshot reads, KIKIMR-15070
ref:1f4d75cf89014b1305c749b4f93cc29676fa0831
-rw-r--r-- | ydb/core/engine/mkql_engine_flat.cpp | 6 | ||||
-rw-r--r-- | ydb/core/engine/mkql_engine_flat.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_data_executer.cpp | 91 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_limits_ut.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/tx_proxy/datareq.cpp | 73 |
5 files changed, 166 insertions, 12 deletions
diff --git a/ydb/core/engine/mkql_engine_flat.cpp b/ydb/core/engine/mkql_engine_flat.cpp index c7fe388eef..32e176a77a 100644 --- a/ydb/core/engine/mkql_engine_flat.cpp +++ b/ydb/core/engine/mkql_engine_flat.cpp @@ -380,7 +380,7 @@ public: return DbKeys; } - EResult PrepareShardPrograms(const TShardLimits& limits) noexcept override { + EResult PrepareShardPrograms(const TShardLimits& limits, ui32* outRSCount) noexcept override { Y_VERIFY(!AreAffectedShardsPrepared, "PrepareShardPrograms is already called"); TGuard<TScopedAlloc> allocGuard(Alloc); AffectedShards.clear(); @@ -497,6 +497,10 @@ public: return EResult::TooManyRS; } + if (outRSCount) { + *outRSCount = readsets.size(); + } + AreAffectedShardsPrepared = true; return EResult::Ok; } diff --git a/ydb/core/engine/mkql_engine_flat.h b/ydb/core/engine/mkql_engine_flat.h index 7bee67584b..71a9404ec9 100644 --- a/ydb/core/engine/mkql_engine_flat.h +++ b/ydb/core/engine/mkql_engine_flat.h @@ -200,7 +200,7 @@ public: //-- proxy interface virtual EResult SetProgram(TStringBuf program, TStringBuf params = TStringBuf()) noexcept = 0; virtual TVector<THolder<TKeyDesc>>& GetDbKeys() noexcept = 0; - virtual EResult PrepareShardPrograms(const TShardLimits& shardLimits = TShardLimits()) noexcept = 0; + virtual EResult PrepareShardPrograms(const TShardLimits& shardLimits = TShardLimits(), ui32* outRSCount = nullptr) noexcept = 0; virtual ui32 GetAffectedShardCount() const noexcept = 0; virtual EResult GetAffectedShard(ui32 index, TShardData& data) const noexcept = 0; virtual void AfterShardProgramsExtracted() noexcept = 0; diff --git a/ydb/core/kqp/executer/kqp_data_executer.cpp b/ydb/core/kqp/executer/kqp_data_executer.cpp index 25a374d211..f5c08aad14 100644 --- a/ydb/core/kqp/executer/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer/kqp_data_executer.cpp @@ -18,6 +18,7 @@ #include <ydb/core/kqp/prepare/kqp_query_plan.h> #include <ydb/core/tx/coordinator/coordinator_impl.h> #include <ydb/core/tx/datashard/datashard.h> +#include <ydb/core/tx/long_tx_service/public/events.h> #include <ydb/core/tx/tx_proxy/proxy.h> #include <ydb/library/yql/dq/runtime/dq_columns_resolve.h> @@ -39,6 +40,7 @@ static constexpr ui32 ReplySizeLimit = 48 * 1024 * 1024; // 48 MB class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Data> { using TBase = TKqpExecuterBase<TKqpDataExecuter, EExecType::Data>; + using TKqpSnapshot = IKqpGateway::TKqpSnapshot; struct TEvPrivate { enum EEv { @@ -1050,6 +1052,8 @@ private: task.Meta.Writes->Ranges.MergeWritePoints(TShardKeyRanges(read.Ranges), keyTypes); } } + + ShardsWithEffects.insert(task.Meta.ShardId); } } else { auto result = (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kUpsertRows) @@ -1069,6 +1073,8 @@ private: } else { task.Meta.Writes->Ranges.MergeWritePoints(std::move(*shardInfo.KeyWriteRanges), keyTypes); } + + ShardsWithEffects.insert(shardId); } } break; @@ -1191,14 +1197,14 @@ private: << ", locks: " << dataTransaction.GetKqpTransaction().GetLocks().ShortDebugString()); TEvDataShard::TEvProposeTransaction* ev; - if (Request.Snapshot.IsValid() && ReadOnlyTx) { + if (Snapshot.IsValid() && ReadOnlyTx) { ev = new TEvDataShard::TEvProposeTransaction( NKikimrTxDataShard::TX_KIND_DATA, SelfId(), TxId, dataTransaction.SerializeAsString(), - Request.Snapshot.Step, - Request.Snapshot.TxId, + Snapshot.Step, + Snapshot.TxId, ImmediateTx ? NTxDataShard::TTxFlags::Immediate : 0); } else { ev = new TEvDataShard::TEvProposeTransaction( @@ -1459,16 +1465,74 @@ private: if (ReadOnlyTx && Request.Snapshot.IsValid()) { // Snapshot reads are always immediate + Snapshot = Request.Snapshot; ImmediateTx = true; } + const bool forceSnapshot = ( + ReadOnlyTx && + !ImmediateTx && + !HasPersistentChannels && + !Database.empty() && + AppData()->FeatureFlags.GetEnableMvccSnapshotReads()); + + if (forceSnapshot) { + ComputeTasks = std::move(computeTasks); + DatashardTxs = std::move(datashardTxs); + + auto longTxService = NLongTxService::MakeLongTxServiceID(SelfId().NodeId()); + Send(longTxService, new NLongTxService::TEvLongTxService::TEvAcquireReadSnapshot(Database)); + + LOG_T("Create temporary mvcc snapshot, ebcome WaitSnapshotState"); + Become(&TKqpDataExecuter::WaitSnapshotState); + return; + } + + ContinueExecute(computeTasks, datashardTxs); + } + + STATEFN(WaitSnapshotState) { + try { + switch (ev->GetTypeRewrite()) { + hFunc(NLongTxService::TEvLongTxService::TEvAcquireReadSnapshotResult, Handle); + hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution); + hFunc(TEvents::TEvWakeup, HandleTimeout); + default: + UnexpectedEvent("WaitSnapshotState", ev->GetTypeRewrite()); + } + } catch (const yexception& e) { + InternalError(e.what()); + } + ReportEventElapsedTime(); + } + + void Handle(NLongTxService::TEvLongTxService::TEvAcquireReadSnapshotResult::TPtr& ev) { + auto& record = ev->Get()->Record; + + if (record.GetStatus() != Ydb::StatusIds::SUCCESS) { + ReplyErrorAndDie(record.GetStatus(), record.MutableIssues()); + return; + } + + Snapshot = TKqpSnapshot(record.GetSnapshotStep(), record.GetSnapshotTxId()); + ImmediateTx = true; + + auto computeTasks = std::move(ComputeTasks); + auto datashardTxs = std::move(DatashardTxs); + ContinueExecute(computeTasks, datashardTxs); + } + + void ContinueExecute( + TVector<NDqProto::TDqTask>& computeTasks, + THashMap<ui64, NKikimrTxDataShard::TKqpTransaction>& datashardTxs) + { UseFollowers = Request.IsolationLevel == NKikimrKqp::ISOLATION_LEVEL_READ_STALE; if (datashardTxs.size() > 1) { // Followers only allowed for single shard transactions. // (legacy behaviour, for compatibility with current execution engine) UseFollowers = false; } - if (Request.Snapshot.IsValid()) { + if (Snapshot.IsValid()) { // TODO: KIKIMR-11912 UseFollowers = false; } @@ -1514,7 +1578,9 @@ private: TSet<ui64> taskShardIds; if (Request.ValidateLocks) { for (auto& [shardId, _] : datashardTasks) { - taskShardIds.insert(shardId); + if (ShardsWithEffects.contains(shardId)) { + taskShardIds.insert(shardId); + } } } @@ -1715,6 +1781,10 @@ public: channelDesc.SetIsPersistent(IsCrossShardChannel(TasksGraph, channel)); channelDesc.SetInMemory(channel.InMemory); + + if (channelDesc.GetIsPersistent()) { + HasPersistentChannels = true; + } } private: @@ -1763,6 +1833,17 @@ private: TInstant FirstPrepareReply; TInstant LastPrepareReply; + + // Tracks which shards are expected to have effects + THashSet<ui64> ShardsWithEffects; + bool HasPersistentChannels = false; + + // Either requested or temporarily acquired snapshot + TKqpSnapshot Snapshot; + + // Temporary storage during snapshot acquisition + TVector<NDqProto::TDqTask> ComputeTasks; + THashMap<ui64, NKikimrTxDataShard::TKqpTransaction> DatashardTxs; }; } // namespace diff --git a/ydb/core/kqp/ut/kqp_limits_ut.cpp b/ydb/core/kqp/ut/kqp_limits_ut.cpp index ab03391281..8f82287ed2 100644 --- a/ydb/core/kqp/ut/kqp_limits_ut.cpp +++ b/ydb/core/kqp/ut/kqp_limits_ut.cpp @@ -316,7 +316,11 @@ Y_UNIT_TEST_SUITE(KqpLimits) { auto& queryLimits = *appConfig.MutableTableServiceConfig()->MutableQueryLimits(); queryLimits.MutablePhaseLimits()->SetTotalReadSizeLimitBytes(100'000'000); - TKikimrRunner kikimr(appConfig); + auto serverSettings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetEnableMvccSnapshotReads(false); + + TKikimrRunner kikimr(serverSettings); CreateLargeTable(kikimr, 20, 10, 1'000'000, 1); auto db = kikimr.GetTableClient(); diff --git a/ydb/core/tx/tx_proxy/datareq.cpp b/ydb/core/tx/tx_proxy/datareq.cpp index af48b42d60..9ce5729899 100644 --- a/ydb/core/tx/tx_proxy/datareq.cpp +++ b/ydb/core/tx/tx_proxy/datareq.cpp @@ -3,6 +3,7 @@ #include <ydb/core/tx/scheme_cache/scheme_cache.h> #include <ydb/core/tx/datashard/datashard.h> #include <ydb/core/tx/balance_coverage/balance_coverage_builder.h> +#include <ydb/core/tx/long_tx_service/public/events.h> #include <ydb/core/tx/tx_processing.h> #include <ydb/core/actorlib_impl/long_timer.h> @@ -308,6 +309,7 @@ private: bool CanUseFollower; bool StreamResponse; + TString DatabaseName; TIntrusivePtr<TFlatMKQLRequest> FlatMKQLRequest; TIntrusivePtr<TReadTableRequest> ReadTableRequest; TString DatashardErrors; @@ -335,6 +337,7 @@ private: TInstant WallClockAccepted; TInstant WallClockResolveStarted; TInstant WallClockResolved; + TInstant WallClockAfterBuild; TInstant WallClockPrepared; TInstant WallClockPlanned; @@ -358,6 +361,7 @@ private: TAutoPtr<TEvTxProxySchemeCache::TEvResolveKeySet> PrepareFlatMKQLRequest(TStringBuf miniKQLProgram, TStringBuf miniKQLParams, const TActorContext &ctx); void ProcessFlatMKQLResolve(NSchemeCache::TSchemeCacheRequest *cacheRequest, const TActorContext &ctx); + void ContinueFlatMKQLResolve(const TActorContext &ctx); void ProcessReadTableResolve(NSchemeCache::TSchemeCacheRequest *cacheRequest, const TActorContext &ctx); TIntrusivePtr<TTxProxyMon> TxProxyMon; @@ -406,6 +410,7 @@ private: void Handle(TEvTxProxyReq::TEvMakeRequest::TPtr &ev, const TActorContext &ctx); void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr &ev, const TActorContext &ctx); void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr &ev, const TActorContext &ctx); + void Handle(NLongTxService::TEvLongTxService::TEvAcquireReadSnapshotResult::TPtr &ev, const TActorContext &ctx); void Handle(TEvDataShard::TEvProposeTransactionRestart::TPtr &ev, const TActorContext &ctx); void HandlePrepare(TEvDataShard::TEvProposeTransactionAttachResult::TPtr &ev, const TActorContext &ctx); void HandlePrepare(TEvDataShard::TEvProposeTransactionResult::TPtr &ev, const TActorContext &ctx); @@ -498,6 +503,17 @@ public: } } + STFUNC(StateWaitSnapshot) { + TRACE_EVENT(NKikimrServices::TX_PROXY); + switch (ev->GetTypeRewrite()) { + HFuncTraced(NLongTxService::TEvLongTxService::TEvAcquireReadSnapshotResult, Handle); + HFuncTraced(TEvTxProcessing::TEvStreamIsDead, Handle); + HFuncTraced(TEvents::TEvUndelivered, Handle); + CFunc(TEvents::TSystem::Wakeup, HandleExecTimeout); + CFunc(TEvPrivate::EvProxyDataReqOngoingTransactionsWatchdog, HandleWatchdog); + } + } + // resolve zombie state to keep shared key desc STFUNC(StateResolveTimeout) { TRACE_EVENT(NKikimrServices::TX_PROXY); @@ -947,8 +963,10 @@ void TDataReq::ProcessFlatMKQLResolve(NSchemeCache::TSchemeCacheRequest *cacheRe if (FlatMKQLRequest->Limits.GetReadsetCountLimit()) { shardLimits.RSCount = std::min(shardLimits.RSCount, FlatMKQLRequest->Limits.GetReadsetCountLimit()); } - FlatMKQLRequest->EngineResultStatusCode = engine.PrepareShardPrograms(shardLimits); + ui32 rsCount = 0; + FlatMKQLRequest->EngineResultStatusCode = engine.PrepareShardPrograms(shardLimits, &rsCount); auto afterBuild = Now(); + WallClockAfterBuild = afterBuild; if (FlatMKQLRequest->EngineResultStatusCode != NMiniKQL::IEngineFlat::EResult::Ok) { IssueManager.RaiseIssue(MakeIssue(NKikimrIssues::TIssuesIds::ENGINE_ERROR)); @@ -966,6 +984,53 @@ void TDataReq::ProcessFlatMKQLResolve(NSchemeCache::TSchemeCacheRequest *cacheRe if (engine.GetAffectedShardCount() > 1 || FlatMKQLRequest->Snapshot) // TODO KIKIMR-11912 CanUseFollower = false; + // Check if we want to use snapshot even when caller didn't provide one + const bool forceSnapshot = ( + FlatMKQLRequest->ReadOnlyProgram && + !FlatMKQLRequest->Snapshot && + rsCount == 0 && + engine.GetAffectedShardCount() > 1 && + ((TxFlags & NTxDataShard::TTxFlags::ForceOnline) == 0) && + AppData(ctx)->FeatureFlags.GetEnableMvccSnapshotReads() && + !DatabaseName.empty()); + + if (forceSnapshot) { + Send(NLongTxService::MakeLongTxServiceID(ctx.SelfID.NodeId()), + new NLongTxService::TEvLongTxService::TEvAcquireReadSnapshot(DatabaseName)); + Become(&TThis::StateWaitSnapshot); + return; + } + + ContinueFlatMKQLResolve(ctx); +} + +void TDataReq::Handle(NLongTxService::TEvLongTxService::TEvAcquireReadSnapshotResult::TPtr &ev, const TActorContext &ctx) { + const auto& record = ev->Get()->Record; + + if (record.GetStatus() != Ydb::StatusIds::SUCCESS) { + NYql::TIssues issues; + NYql::IssuesFromMessage(record.GetIssues(), issues); + IssueManager.RaiseIssues(issues); + ReportStatus( + TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ResolveError, + NKikimrIssues::TStatusIds::ERROR, + true, ctx); + Die(ctx); + return; + } + + // Update timestamp: snapshot creation should not be included in send time histogram + WallClockAfterBuild = Now(); + + Y_VERIFY(FlatMKQLRequest); + FlatMKQLRequest->Snapshot = TRowVersion(record.GetSnapshotStep(), record.GetSnapshotTxId()); + ContinueFlatMKQLResolve(ctx); +} + +void TDataReq::ContinueFlatMKQLResolve(const TActorContext &ctx) { + NMiniKQL::IEngineFlat &engine = *FlatMKQLRequest->Engine; + auto &keyDescriptions = engine.GetDbKeys(); + TDuration shardCancelAfter = ExecTimeoutPeriod; if (CancelAfter) { shardCancelAfter = Min(shardCancelAfter, CancelAfter); @@ -1062,7 +1127,7 @@ void TDataReq::ProcessFlatMKQLResolve(NSchemeCache::TSchemeCacheRequest *cacheRe } engine.AfterShardProgramsExtracted(); - TxProxyMon->TxPrepareSendShardProgramsHgram->Collect((Now() - afterBuild).MicroSeconds()); + TxProxyMon->TxPrepareSendShardProgramsHgram->Collect((Now() - WallClockAfterBuild).MicroSeconds()); Become(&TThis::StateWaitPrepare); } @@ -1257,7 +1322,7 @@ void TDataReq::Handle(TEvTxProxyReq::TEvMakeRequest::TPtr &ev, const TActorConte if (txbody.HasReadTableTransaction()) { ReadTableRequest = new TReadTableRequest(txbody.GetReadTableTransaction()); TAutoPtr<NSchemeCache::TSchemeCacheNavigate> request(new NSchemeCache::TSchemeCacheNavigate()); - request->DatabaseName = record.GetDatabaseName(); + request->DatabaseName = DatabaseName = record.GetDatabaseName(); NSchemeCache::TSchemeCacheNavigate::TEntry entry; entry.Path = SplitPath(ReadTableRequest->TablePath); @@ -1365,7 +1430,7 @@ void TDataReq::Handle(TEvTxProxyReq::TEvMakeRequest::TPtr &ev, const TActorConte NKikimrIssues::TStatusIds::TRANSIENT, false, ctx); } - resolveReq->Request->DatabaseName = record.GetDatabaseName(); + resolveReq->Request->DatabaseName = DatabaseName = record.GetDatabaseName(); TxProxyMon->MakeRequestProxyAccepted->Inc(); LOG_DEBUG_S_SAMPLED_BY(ctx, NKikimrServices::TX_PROXY, TxId, "Actor# " << ctx.SelfID.ToString() << " txid# " << TxId |