diff options
author | ssmike <ssmike@ydb.tech> | 2023-01-23 14:13:08 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2023-01-23 14:13:08 +0300 |
commit | 0084aa2e47d04b92fd5a741fc7ed7495b02d9ada (patch) | |
tree | 57dbf3b9b197491a0a7cf4d0d86352b2b7af2a09 | |
parent | e6295dbc5649b564ee80854303a408869e949360 (diff) | |
download | ydb-0084aa2e47d04b92fd5a741fc7ed7495b02d9ada.tar.gz |
support locks in all read actors
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 68 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 17 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scan_executer.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 27 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp | 16 | ||||
-rw-r--r-- | ydb/core/kqp/ut/opt/kqp_ne_ut.cpp | 45 | ||||
-rw-r--r-- | ydb/core/protos/tx_datashard.proto | 7 | ||||
-rw-r--r-- | ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h | 2 | ||||
-rw-r--r-- | ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h | 20 | ||||
-rw-r--r-- | ydb/library/yql/dq/actors/protos/dq_events.proto | 12 |
10 files changed, 201 insertions, 15 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 48266494cd..27220f1d36 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -204,6 +204,24 @@ public: response.SetStatus(Ydb::StatusIds::SUCCESS); Counters->TxProxyMon->ReportStatusOK->Inc(); + auto addLocks = [&](const NYql::NDqProto::TExtraInputData& data) { + if (data.GetData().Is<NKikimrTxDataShard::TEvKqpInputActorResultInfo>()) { + NKikimrTxDataShard::TEvKqpInputActorResultInfo info; + YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings"); + for (auto& lock : info.GetLocks()) { + Locks.push_back(lock); + } + } + }; + for (auto& [_, data] : ExtraData) { + for (auto& source : data.GetSourcesExtraData()) { + addLocks(source); + } + for (auto& transform : data.GetInputTransformsData()) { + addLocks(transform); + } + } + if (!Locks.empty()) { if (LockHandle) { ResponseEv->LockHandle = std::move(LockHandle); @@ -1423,6 +1441,12 @@ private: } void Execute() { + LockTxId = Request.AcquireLocksTxId; + if (LockTxId.Defined() && *LockTxId == 0) { + LockTxId = TxId; + } + Snapshot = Request.Snapshot; + NWilson::TSpan prepareTasksSpan(TWilsonKqp::DataExecuterPrepateTasks, ExecuterStateSpan.GetTraceId(), "PrepateTasks", NWilson::EFlags::AUTO_END); LWTRACK(KqpDataExecuterStartExecute, ResponseEv->Orbit, TxId); RequestControls.Reqister(TlsActivationContext->AsActorContext()); @@ -1461,7 +1485,7 @@ private: if (stage.SourcesSize() > 0) { switch (stage.GetSources(0).GetTypeCase()) { case NKqpProto::TKqpSource::kReadRangesSource: - BuildScanTasksFromSource(stageInfo); + BuildScanTasksFromSource(stageInfo, Request.Snapshot, LockTxId); break; default: YQL_ENSURE(false, "unknown source type"); @@ -1963,10 +1987,12 @@ private: } void ExecuteTasks() { - auto lockTxId = Request.AcquireLocksTxId; - if (lockTxId.Defined() && *lockTxId == 0) { - lockTxId = TxId; - LockHandle = TLockHandle(TxId, TActivationContext::ActorSystem()); + { + auto lockTxId = Request.AcquireLocksTxId; + if (lockTxId.Defined() && *lockTxId == 0) { + lockTxId = TxId; + LockHandle = TLockHandle(TxId, TActivationContext::ActorSystem()); + } } NWilson::TSpan sendTasksSpan(TWilsonKqp::DataExecuterSendTasksAndTxs, ExecuterStateSpan.GetTraceId(), "SendTasksAndTxs", NWilson::EFlags::AUTO_END); @@ -1976,7 +2002,7 @@ private: TVector<ui64> computeTaskIds{Reserve(ComputeTasks.size())}; for (auto&& taskDesc : ComputeTasks) { computeTaskIds.emplace_back(taskDesc.GetId()); - FillInputSettings(taskDesc, lockTxId); + FillInputSettings(taskDesc); ExecuteDataComputeTask(std::move(taskDesc)); } @@ -1988,7 +2014,7 @@ private: for (auto& taskDesc : tasks) { remoteComputeTasksCnt += 1; - FillInputSettings(taskDesc, lockTxId); + FillInputSettings(taskDesc); PendingComputeTasks.insert(taskDesc.GetId()); tasksPerNode[it->second].emplace_back(std::move(taskDesc)); } @@ -2069,7 +2095,7 @@ private: LOG_D("datashard task: " << taskId << ", proto: " << protoTask.ShortDebugString()); } - ExecuteDatashardTransaction(shardId, shardTx, lockTxId); + ExecuteDatashardTransaction(shardId, shardTx, LockTxId); } ExecuteTopicTabletTransactions(TopicTxs); @@ -2210,7 +2236,7 @@ private: } } - void FillInputSettings(NYql::NDqProto::TDqTask& task, const TMaybe<ui64> lockTxId) { + void FillInputSettings(NYql::NDqProto::TDqTask& task) { for (auto& input : *task.MutableInputs()) { if (input.HasTransform()) { auto transform = input.MutableTransform(); @@ -2230,13 +2256,31 @@ private: settings.MutableSnapshot()->SetTxId(Snapshot.TxId); } - if (lockTxId.Defined()) { - settings.SetLockTxId(*lockTxId); + if (LockTxId.Defined()) { + settings.SetLockTxId(*LockTxId); } settings.SetImmediateTx(ImmediateTx); transform->MutableSettings()->PackFrom(settings); } + if (input.HasSource() && Snapshot != Request.Snapshot && input.GetSource().GetType() == NYql::KqpReadRangesSourceName) { + auto source = input.MutableSource(); + const google::protobuf::Any& settingsAny = source->GetSettings(); + + YQL_ENSURE(settingsAny.Is<NKikimrTxDataShard::TKqpReadRangesSourceSettings>(), "Expected settings type: " + << NKikimrTxDataShard::TKqpReadRangesSourceSettings::descriptor()->full_name() + << " , but got: " << settingsAny.type_url()); + + NKikimrTxDataShard::TKqpReadRangesSourceSettings settings; + YQL_ENSURE(settingsAny.UnpackTo(&settings), "Failed to unpack settings"); + + if (Snapshot.IsValid()) { + settings.MutableSnapshot()->SetStep(Snapshot.Step); + settings.MutableSnapshot()->SetTxId(Snapshot.TxId); + } + + source->MutableSettings()->PackFrom(settings); + } } } @@ -2283,10 +2327,12 @@ private: THashSet<ui64> SubscribedNodes; THashMap<ui64, TVector<NDqProto::TDqTask>> RemoteComputeTasks; + TVector<NDqProto::TDqTask> ComputeTasks; TDatashardTxs DatashardTxs; TTopicTabletTxs TopicTxs; + TMaybe<ui64> LockTxId; // Lock handle for a newly acquired lock TLockHandle LockHandle; ui64 LastShard = 0; diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 388d794d0f..a9ae649d51 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -230,6 +230,7 @@ protected: if (Stats) { Stats->AddComputeActorStats(computeActor.NodeId(), std::move(*state.MutableStats())); } + ExtraData[computeActor].Swap(state.MutableExtraData()); LastTaskId = taskId; LastComputeActorId = computeActor.ToString(); @@ -650,7 +651,7 @@ protected: } } - void BuildScanTasksFromSource(TStageInfo& stageInfo) { + void BuildScanTasksFromSource(TStageInfo& stageInfo, IKqpGateway::TKqpSnapshot snapshot, const TMaybe<ui64> lockTxId = {}) { THashMap<ui64, std::vector<ui64>> nodeTasks; THashMap<ui64, ui64> assignedShardsCount; @@ -720,8 +721,10 @@ protected: settings.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::CELLVEC); } - settings.MutableSnapshot()->SetStep(Request.Snapshot.Step); - settings.MutableSnapshot()->SetTxId(Request.Snapshot.TxId); + if (snapshot.IsValid()) { + settings.MutableSnapshot()->SetStep(snapshot.Step); + settings.MutableSnapshot()->SetTxId(snapshot.TxId); + } shardInfo.KeyReadRanges->SerializeTo(&settings); settings.SetReverse(reverse); @@ -733,6 +736,12 @@ protected: Request.TxAlloc->TypeEnv, itemsLimit, itemsLimitParamName, itemsLimitBytes, itemsLimitType); settings.SetItemsLimit(itemsLimit); + auto self = static_cast<TDerived*>(this)->SelfId(); + if (lockTxId) { + settings.SetLockTxId(*lockTxId); + settings.SetLockNodeId(self.NodeId()); + } + const auto& stageSource = stage.GetSources(0); auto& input = task.Inputs[stageSource.GetInputIndex()]; auto& taskSourceSettings = input.SourceSettings; @@ -1058,6 +1067,8 @@ protected: TActorId KqpTableResolverId; TActorId KqpShardsResolverId; THashMap<TActorId, TProgressStat> PendingComputeActors; // Running compute actors (pure and DS) + THashMap<TActorId, NYql::NDqProto::TComputeActorExtraData> ExtraData; + TVector<TProgressStat> LastStats; TInstant StartResolveTime; diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 64f3c023d8..6a3cde0308 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -420,7 +420,7 @@ private: if (stage.SourcesSize() > 0) { switch (stage.GetSources(0).GetTypeCase()) { case NKqpProto::TKqpSource::kReadRangesSource: - BuildScanTasksFromSource(stageInfo); + BuildScanTasksFromSource(stageInfo, Request.Snapshot); break; default: YQL_ENSURE(false, "unknown source type"); diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index 482409e3fb..50aa6536b3 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -572,6 +572,14 @@ public: record.SetResultFormat(Settings.GetDataFormat()); + if (Settings.HasLockTxId()) { + record.SetLockTxId(Settings.GetLockTxId()); + } + + if (Settings.HasLockNodeId()) { + record.SetLockNodeId(Settings.GetLockNodeId()); + } + CA_LOG_D(TStringBuilder() << "Send EvRead to shardId: " << state->TabletId << ", tablePath: " << Settings.GetTable().GetTablePath() << ", ranges: " << DebugPrintRanges(KeyColumnTypes, ev->Ranges, *AppData()->TypeRegistry) << ", readId = " << id); @@ -590,12 +598,19 @@ public: return; } + if (record.BrokenTxLocksSize()) { + return RuntimeError("Transaction locks invalidated.", NYql::NDqProto::StatusIds::ABORTED); + } + if (record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) { for (auto& issue : record.GetStatus().GetIssues()) { CA_LOG_D("read id #" << id << " got issue " << issue.Getmessage()); } return RetryRead(id); } + for (auto& lock : record.GetTxLocks()) { + Locks.push_back(lock); + } Reads[id].SerializedContinuationToken = record.GetContinuationToken(); Reads[id].RegisterMessage(*ev->Get()); @@ -827,6 +842,16 @@ public: Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), statusCode)); } + TMaybe<google::protobuf::Any> ExtraData() override { + google::protobuf::Any result; + NKikimrTxDataShard::TEvKqpInputActorResultInfo resultInfo; + for (auto& lock : Locks) { + resultInfo.AddLocks()->CopyFrom(lock); + } + result.PackFrom(resultInfo); + return result; + } + private: NKikimrTxDataShard::TKqpReadRangesSourceSettings Settings; @@ -859,6 +884,8 @@ private: }; TQueue<TResult> Results; + TVector<NKikimrTxDataShard::TLock> Locks; + ui32 MaxInFlight = 1024; const TString LogPrefix; TTableId TableId; diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index 8da7f66365..f867e92ce5 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -176,6 +176,16 @@ private: return totalDataSize; } + TMaybe<google::protobuf::Any> ExtraData() override { + google::protobuf::Any result; + NKikimrTxDataShard::TEvKqpInputActorResultInfo resultInfo; + for (auto& lock : Locks) { + resultInfo.AddLocks()->CopyFrom(lock); + } + result.PackFrom(resultInfo); + return result; + } + STFUNC(StateFunc) { Y_UNUSED(ctx); @@ -257,6 +267,10 @@ private: return RetryTableRead(read, continuationToken); } + for (auto& lock : record.GetTxLocks()) { + Locks.push_back(lock); + } + YQL_ENSURE(record.GetResultFormat() == NKikimrTxDataShard::EScanDataFormat::CELLVEC); auto nrows = ev->Get()->GetRowsCount(); for (ui64 rowId = 0; rowId < nrows; ++rowId) { @@ -612,6 +626,8 @@ private: const TDuration SchemeCacheRequestTimeout; NActors::TActorId SchemeCacheRequestTimeoutTimer; const TDuration RetryReadTimeout; + + TVector<NKikimrTxDataShard::TLock> Locks; }; } // namespace diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp index c591b2d63f..8926586790 100644 --- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp @@ -3501,6 +3501,51 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { CompareYson(R"([[[101u];[1]];[[102u];[3]]])", FormatResultSetYson(result.GetResultSet(0))); } } + + Y_UNIT_TEST(DqSourceLocksEffects) { + TKikimrSettings settings; + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(true); + settings.SetDomainRoot(KikimrDefaultUtDomainRoot); + TFeatureFlags flags; + flags.SetEnablePredicateExtractForDataQueries(true); + settings.SetFeatureFlags(flags); + settings.SetAppConfig(appConfig); + + TKikimrRunner kikimr(settings); + auto db = kikimr.GetTableClient(); + auto session1 = db.CreateSession().GetValueSync().GetSession(); + auto session3 = db.CreateSession().GetValueSync().GetSession(); + + auto result = session1.ExecuteDataQuery(R"( + SELECT * FROM `/Root/TwoShard` WHERE Key <= 1; + )", TTxControl::BeginTx(TTxSettings::SerializableRW())).GetValueSync(); + AssertSuccessResult(result); + + auto tx = result.GetTransaction(); + + auto session2 = db.CreateSession().GetValueSync().GetSession(); + result = session2.ExecuteDataQuery(R"( + UPSERT INTO `/Root/TwoShard` (Key, Value1) VALUES(1, "NewValue2"); + )", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).GetValueSync(); + AssertSuccessResult(result); + + result = session1.ExecuteDataQuery(R"( + UPSERT INTO `/Root/TwoShard` (Key,Value1) VALUES(1, "NewValue"); + )", TTxControl::Tx(*tx).CommitTx()).GetValueSync(); + UNIT_ASSERT(!result.IsSuccess()); + result.GetIssues().PrintTo(Cerr); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::ABORTED); + UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED)); + + result = session2.ExecuteDataQuery(R"( + SELECT Key, Value1 FROM `/Root/TwoShard` WHERE Key <= 1; + )", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).GetValueSync(); + AssertSuccessResult(result); + + CompareYson(R"([[[1u];["NewValue2"]]])", + FormatResultSetYson(result.GetResultSet(0))); + } } } // namespace NKikimr::NKqp diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index 450c6635cf..08820a7718 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -240,6 +240,10 @@ message TKqpTransaction { optional bool UseGenericReadSets = 7 [default = false]; } +message TEvKqpInputActorResultInfo { + repeated TLock Locks = 1; +} + message TKqpReadRangesSourceSettings { optional TKqpTransaction.TTableMeta Table = 1; @@ -257,6 +261,9 @@ message TKqpReadRangesSourceSettings { optional NKikimrProto.TRowVersion Snapshot = 10; optional uint64 ShardIdHint = 11; optional bool Sorted = 12; + + optional uint64 LockTxId = 13; + optional uint32 LockNodeId = 14; } message TKqpTaskInfo { diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h index 8e45f4f40f..d7245f1632 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h @@ -87,6 +87,8 @@ struct IDqComputeActorAsyncInput { return 0; } + virtual TMaybe<google::protobuf::Any> ExtraData() { return {}; } + // The same signature as IActor::PassAway(). // It is guaranted that this method will be called with bound MKQL allocator. // So, it is the right place to destroy all internal UnboxedValues. diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 49b3998213..3a8976f1ec 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -577,11 +577,31 @@ protected: Terminate(success, TIssues({TIssue(message)})); } + void FillExtraData(NDqProto::TEvComputeActorState& state) { + auto* extraData = state.MutableExtraData(); + for (auto& [index, input] : SourcesMap) { + if (auto data = input.AsyncInput->ExtraData()) { + auto* entry = extraData->AddSourcesExtraData(); + entry->SetIndex(index); + entry->MutableData()->CopyFrom(*data); + } + } + for (auto& [index, input] : SourcesMap) { + if (auto data = input.AsyncInput->ExtraData()) { + auto* entry = extraData->AddInputTransformsData(); + entry->SetIndex(index); + entry->MutableData()->CopyFrom(*data); + } + } + } + void ReportStateAndMaybeDie(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues) { auto execEv = MakeHolder<TEvDqCompute::TEvState>(); auto& record = execEv->Record; + FillExtraData(record); + record.SetState(State); record.SetStatusCode(statusCode); record.SetTaskId(Task.GetId()); diff --git a/ydb/library/yql/dq/actors/protos/dq_events.proto b/ydb/library/yql/dq/actors/protos/dq_events.proto index 3baf18902e..e4d0e201ca 100644 --- a/ydb/library/yql/dq/actors/protos/dq_events.proto +++ b/ydb/library/yql/dq/actors/protos/dq_events.proto @@ -1,6 +1,7 @@ package NYql.NDqProto; option cc_enable_arenas = true; +import "google/protobuf/any.proto"; import "library/cpp/actors/protos/actors.proto"; import "ydb/library/yql/dq/actors/protos/dq_stats.proto"; import "ydb/library/yql/dq/actors/protos/dq_status_codes.proto"; @@ -54,6 +55,16 @@ message TComputeActorStats { reserved 2; //optional NKqpProto.TKqpStatsRun LegacyStats = 2; }; +message TExtraInputData { + optional uint32 Index = 1; + optional google.protobuf.Any Data = 2; +} + +message TComputeActorExtraData { + repeated TExtraInputData SourcesExtraData = 1; + repeated TExtraInputData InputTransformsData = 2; +} + message TEvComputeActorState { optional uint32 State = 1; // == EComputeState optional TComputeActorStats LegacyStats = 2; @@ -62,6 +73,7 @@ message TEvComputeActorState { optional uint64 TaskId = 5; optional TDqComputeActorStats Stats = 6; optional NYql.NDqProto.StatusIds.StatusCode StatusCode = 7; + optional TComputeActorExtraData ExtraData = 8; }; message TEvComputeStateRequest { |