aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <ssmike@ydb.tech>2023-01-23 14:13:08 +0300
committerssmike <ssmike@ydb.tech>2023-01-23 14:13:08 +0300
commit0084aa2e47d04b92fd5a741fc7ed7495b02d9ada (patch)
tree57dbf3b9b197491a0a7cf4d0d86352b2b7af2a09
parente6295dbc5649b564ee80854303a408869e949360 (diff)
downloadydb-0084aa2e47d04b92fd5a741fc7ed7495b02d9ada.tar.gz
support locks in all read actors
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp68
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h17
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp2
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp27
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp16
-rw-r--r--ydb/core/kqp/ut/opt/kqp_ne_ut.cpp45
-rw-r--r--ydb/core/protos/tx_datashard.proto7
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h20
-rw-r--r--ydb/library/yql/dq/actors/protos/dq_events.proto12
10 files changed, 201 insertions, 15 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 48266494cd..27220f1d36 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -204,6 +204,24 @@ public:
response.SetStatus(Ydb::StatusIds::SUCCESS);
Counters->TxProxyMon->ReportStatusOK->Inc();
+ auto addLocks = [&](const NYql::NDqProto::TExtraInputData& data) {
+ if (data.GetData().Is<NKikimrTxDataShard::TEvKqpInputActorResultInfo>()) {
+ NKikimrTxDataShard::TEvKqpInputActorResultInfo info;
+ YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings");
+ for (auto& lock : info.GetLocks()) {
+ Locks.push_back(lock);
+ }
+ }
+ };
+ for (auto& [_, data] : ExtraData) {
+ for (auto& source : data.GetSourcesExtraData()) {
+ addLocks(source);
+ }
+ for (auto& transform : data.GetInputTransformsData()) {
+ addLocks(transform);
+ }
+ }
+
if (!Locks.empty()) {
if (LockHandle) {
ResponseEv->LockHandle = std::move(LockHandle);
@@ -1423,6 +1441,12 @@ private:
}
void Execute() {
+ LockTxId = Request.AcquireLocksTxId;
+ if (LockTxId.Defined() && *LockTxId == 0) {
+ LockTxId = TxId;
+ }
+ Snapshot = Request.Snapshot;
+
NWilson::TSpan prepareTasksSpan(TWilsonKqp::DataExecuterPrepateTasks, ExecuterStateSpan.GetTraceId(), "PrepateTasks", NWilson::EFlags::AUTO_END);
LWTRACK(KqpDataExecuterStartExecute, ResponseEv->Orbit, TxId);
RequestControls.Reqister(TlsActivationContext->AsActorContext());
@@ -1461,7 +1485,7 @@ private:
if (stage.SourcesSize() > 0) {
switch (stage.GetSources(0).GetTypeCase()) {
case NKqpProto::TKqpSource::kReadRangesSource:
- BuildScanTasksFromSource(stageInfo);
+ BuildScanTasksFromSource(stageInfo, Request.Snapshot, LockTxId);
break;
default:
YQL_ENSURE(false, "unknown source type");
@@ -1963,10 +1987,12 @@ private:
}
void ExecuteTasks() {
- auto lockTxId = Request.AcquireLocksTxId;
- if (lockTxId.Defined() && *lockTxId == 0) {
- lockTxId = TxId;
- LockHandle = TLockHandle(TxId, TActivationContext::ActorSystem());
+ {
+ auto lockTxId = Request.AcquireLocksTxId;
+ if (lockTxId.Defined() && *lockTxId == 0) {
+ lockTxId = TxId;
+ LockHandle = TLockHandle(TxId, TActivationContext::ActorSystem());
+ }
}
NWilson::TSpan sendTasksSpan(TWilsonKqp::DataExecuterSendTasksAndTxs, ExecuterStateSpan.GetTraceId(), "SendTasksAndTxs", NWilson::EFlags::AUTO_END);
@@ -1976,7 +2002,7 @@ private:
TVector<ui64> computeTaskIds{Reserve(ComputeTasks.size())};
for (auto&& taskDesc : ComputeTasks) {
computeTaskIds.emplace_back(taskDesc.GetId());
- FillInputSettings(taskDesc, lockTxId);
+ FillInputSettings(taskDesc);
ExecuteDataComputeTask(std::move(taskDesc));
}
@@ -1988,7 +2014,7 @@ private:
for (auto& taskDesc : tasks) {
remoteComputeTasksCnt += 1;
- FillInputSettings(taskDesc, lockTxId);
+ FillInputSettings(taskDesc);
PendingComputeTasks.insert(taskDesc.GetId());
tasksPerNode[it->second].emplace_back(std::move(taskDesc));
}
@@ -2069,7 +2095,7 @@ private:
LOG_D("datashard task: " << taskId << ", proto: " << protoTask.ShortDebugString());
}
- ExecuteDatashardTransaction(shardId, shardTx, lockTxId);
+ ExecuteDatashardTransaction(shardId, shardTx, LockTxId);
}
ExecuteTopicTabletTransactions(TopicTxs);
@@ -2210,7 +2236,7 @@ private:
}
}
- void FillInputSettings(NYql::NDqProto::TDqTask& task, const TMaybe<ui64> lockTxId) {
+ void FillInputSettings(NYql::NDqProto::TDqTask& task) {
for (auto& input : *task.MutableInputs()) {
if (input.HasTransform()) {
auto transform = input.MutableTransform();
@@ -2230,13 +2256,31 @@ private:
settings.MutableSnapshot()->SetTxId(Snapshot.TxId);
}
- if (lockTxId.Defined()) {
- settings.SetLockTxId(*lockTxId);
+ if (LockTxId.Defined()) {
+ settings.SetLockTxId(*LockTxId);
}
settings.SetImmediateTx(ImmediateTx);
transform->MutableSettings()->PackFrom(settings);
}
+ if (input.HasSource() && Snapshot != Request.Snapshot && input.GetSource().GetType() == NYql::KqpReadRangesSourceName) {
+ auto source = input.MutableSource();
+ const google::protobuf::Any& settingsAny = source->GetSettings();
+
+ YQL_ENSURE(settingsAny.Is<NKikimrTxDataShard::TKqpReadRangesSourceSettings>(), "Expected settings type: "
+ << NKikimrTxDataShard::TKqpReadRangesSourceSettings::descriptor()->full_name()
+ << " , but got: " << settingsAny.type_url());
+
+ NKikimrTxDataShard::TKqpReadRangesSourceSettings settings;
+ YQL_ENSURE(settingsAny.UnpackTo(&settings), "Failed to unpack settings");
+
+ if (Snapshot.IsValid()) {
+ settings.MutableSnapshot()->SetStep(Snapshot.Step);
+ settings.MutableSnapshot()->SetTxId(Snapshot.TxId);
+ }
+
+ source->MutableSettings()->PackFrom(settings);
+ }
}
}
@@ -2283,10 +2327,12 @@ private:
THashSet<ui64> SubscribedNodes;
THashMap<ui64, TVector<NDqProto::TDqTask>> RemoteComputeTasks;
+
TVector<NDqProto::TDqTask> ComputeTasks;
TDatashardTxs DatashardTxs;
TTopicTabletTxs TopicTxs;
+ TMaybe<ui64> LockTxId;
// Lock handle for a newly acquired lock
TLockHandle LockHandle;
ui64 LastShard = 0;
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 388d794d0f..a9ae649d51 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -230,6 +230,7 @@ protected:
if (Stats) {
Stats->AddComputeActorStats(computeActor.NodeId(), std::move(*state.MutableStats()));
}
+ ExtraData[computeActor].Swap(state.MutableExtraData());
LastTaskId = taskId;
LastComputeActorId = computeActor.ToString();
@@ -650,7 +651,7 @@ protected:
}
}
- void BuildScanTasksFromSource(TStageInfo& stageInfo) {
+ void BuildScanTasksFromSource(TStageInfo& stageInfo, IKqpGateway::TKqpSnapshot snapshot, const TMaybe<ui64> lockTxId = {}) {
THashMap<ui64, std::vector<ui64>> nodeTasks;
THashMap<ui64, ui64> assignedShardsCount;
@@ -720,8 +721,10 @@ protected:
settings.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::CELLVEC);
}
- settings.MutableSnapshot()->SetStep(Request.Snapshot.Step);
- settings.MutableSnapshot()->SetTxId(Request.Snapshot.TxId);
+ if (snapshot.IsValid()) {
+ settings.MutableSnapshot()->SetStep(snapshot.Step);
+ settings.MutableSnapshot()->SetTxId(snapshot.TxId);
+ }
shardInfo.KeyReadRanges->SerializeTo(&settings);
settings.SetReverse(reverse);
@@ -733,6 +736,12 @@ protected:
Request.TxAlloc->TypeEnv, itemsLimit, itemsLimitParamName, itemsLimitBytes, itemsLimitType);
settings.SetItemsLimit(itemsLimit);
+ auto self = static_cast<TDerived*>(this)->SelfId();
+ if (lockTxId) {
+ settings.SetLockTxId(*lockTxId);
+ settings.SetLockNodeId(self.NodeId());
+ }
+
const auto& stageSource = stage.GetSources(0);
auto& input = task.Inputs[stageSource.GetInputIndex()];
auto& taskSourceSettings = input.SourceSettings;
@@ -1058,6 +1067,8 @@ protected:
TActorId KqpTableResolverId;
TActorId KqpShardsResolverId;
THashMap<TActorId, TProgressStat> PendingComputeActors; // Running compute actors (pure and DS)
+ THashMap<TActorId, NYql::NDqProto::TComputeActorExtraData> ExtraData;
+
TVector<TProgressStat> LastStats;
TInstant StartResolveTime;
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index 64f3c023d8..6a3cde0308 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -420,7 +420,7 @@ private:
if (stage.SourcesSize() > 0) {
switch (stage.GetSources(0).GetTypeCase()) {
case NKqpProto::TKqpSource::kReadRangesSource:
- BuildScanTasksFromSource(stageInfo);
+ BuildScanTasksFromSource(stageInfo, Request.Snapshot);
break;
default:
YQL_ENSURE(false, "unknown source type");
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index 482409e3fb..50aa6536b3 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -572,6 +572,14 @@ public:
record.SetResultFormat(Settings.GetDataFormat());
+ if (Settings.HasLockTxId()) {
+ record.SetLockTxId(Settings.GetLockTxId());
+ }
+
+ if (Settings.HasLockNodeId()) {
+ record.SetLockNodeId(Settings.GetLockNodeId());
+ }
+
CA_LOG_D(TStringBuilder() << "Send EvRead to shardId: " << state->TabletId << ", tablePath: " << Settings.GetTable().GetTablePath()
<< ", ranges: " << DebugPrintRanges(KeyColumnTypes, ev->Ranges, *AppData()->TypeRegistry)
<< ", readId = " << id);
@@ -590,12 +598,19 @@ public:
return;
}
+ if (record.BrokenTxLocksSize()) {
+ return RuntimeError("Transaction locks invalidated.", NYql::NDqProto::StatusIds::ABORTED);
+ }
+
if (record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) {
for (auto& issue : record.GetStatus().GetIssues()) {
CA_LOG_D("read id #" << id << " got issue " << issue.Getmessage());
}
return RetryRead(id);
}
+ for (auto& lock : record.GetTxLocks()) {
+ Locks.push_back(lock);
+ }
Reads[id].SerializedContinuationToken = record.GetContinuationToken();
Reads[id].RegisterMessage(*ev->Get());
@@ -827,6 +842,16 @@ public:
Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), statusCode));
}
+ TMaybe<google::protobuf::Any> ExtraData() override {
+ google::protobuf::Any result;
+ NKikimrTxDataShard::TEvKqpInputActorResultInfo resultInfo;
+ for (auto& lock : Locks) {
+ resultInfo.AddLocks()->CopyFrom(lock);
+ }
+ result.PackFrom(resultInfo);
+ return result;
+ }
+
private:
NKikimrTxDataShard::TKqpReadRangesSourceSettings Settings;
@@ -859,6 +884,8 @@ private:
};
TQueue<TResult> Results;
+ TVector<NKikimrTxDataShard::TLock> Locks;
+
ui32 MaxInFlight = 1024;
const TString LogPrefix;
TTableId TableId;
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
index 8da7f66365..f867e92ce5 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
@@ -176,6 +176,16 @@ private:
return totalDataSize;
}
+ TMaybe<google::protobuf::Any> ExtraData() override {
+ google::protobuf::Any result;
+ NKikimrTxDataShard::TEvKqpInputActorResultInfo resultInfo;
+ for (auto& lock : Locks) {
+ resultInfo.AddLocks()->CopyFrom(lock);
+ }
+ result.PackFrom(resultInfo);
+ return result;
+ }
+
STFUNC(StateFunc) {
Y_UNUSED(ctx);
@@ -257,6 +267,10 @@ private:
return RetryTableRead(read, continuationToken);
}
+ for (auto& lock : record.GetTxLocks()) {
+ Locks.push_back(lock);
+ }
+
YQL_ENSURE(record.GetResultFormat() == NKikimrTxDataShard::EScanDataFormat::CELLVEC);
auto nrows = ev->Get()->GetRowsCount();
for (ui64 rowId = 0; rowId < nrows; ++rowId) {
@@ -612,6 +626,8 @@ private:
const TDuration SchemeCacheRequestTimeout;
NActors::TActorId SchemeCacheRequestTimeoutTimer;
const TDuration RetryReadTimeout;
+
+ TVector<NKikimrTxDataShard::TLock> Locks;
};
} // namespace
diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
index c591b2d63f..8926586790 100644
--- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
+++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
@@ -3501,6 +3501,51 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
CompareYson(R"([[[101u];[1]];[[102u];[3]]])", FormatResultSetYson(result.GetResultSet(0)));
}
}
+
+ Y_UNIT_TEST(DqSourceLocksEffects) {
+ TKikimrSettings settings;
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(true);
+ settings.SetDomainRoot(KikimrDefaultUtDomainRoot);
+ TFeatureFlags flags;
+ flags.SetEnablePredicateExtractForDataQueries(true);
+ settings.SetFeatureFlags(flags);
+ settings.SetAppConfig(appConfig);
+
+ TKikimrRunner kikimr(settings);
+ auto db = kikimr.GetTableClient();
+ auto session1 = db.CreateSession().GetValueSync().GetSession();
+ auto session3 = db.CreateSession().GetValueSync().GetSession();
+
+ auto result = session1.ExecuteDataQuery(R"(
+ SELECT * FROM `/Root/TwoShard` WHERE Key <= 1;
+ )", TTxControl::BeginTx(TTxSettings::SerializableRW())).GetValueSync();
+ AssertSuccessResult(result);
+
+ auto tx = result.GetTransaction();
+
+ auto session2 = db.CreateSession().GetValueSync().GetSession();
+ result = session2.ExecuteDataQuery(R"(
+ UPSERT INTO `/Root/TwoShard` (Key, Value1) VALUES(1, "NewValue2");
+ )", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).GetValueSync();
+ AssertSuccessResult(result);
+
+ result = session1.ExecuteDataQuery(R"(
+ UPSERT INTO `/Root/TwoShard` (Key,Value1) VALUES(1, "NewValue");
+ )", TTxControl::Tx(*tx).CommitTx()).GetValueSync();
+ UNIT_ASSERT(!result.IsSuccess());
+ result.GetIssues().PrintTo(Cerr);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::ABORTED);
+ UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED));
+
+ result = session2.ExecuteDataQuery(R"(
+ SELECT Key, Value1 FROM `/Root/TwoShard` WHERE Key <= 1;
+ )", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).GetValueSync();
+ AssertSuccessResult(result);
+
+ CompareYson(R"([[[1u];["NewValue2"]]])",
+ FormatResultSetYson(result.GetResultSet(0)));
+ }
}
} // namespace NKikimr::NKqp
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index 450c6635cf..08820a7718 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -240,6 +240,10 @@ message TKqpTransaction {
optional bool UseGenericReadSets = 7 [default = false];
}
+message TEvKqpInputActorResultInfo {
+ repeated TLock Locks = 1;
+}
+
message TKqpReadRangesSourceSettings {
optional TKqpTransaction.TTableMeta Table = 1;
@@ -257,6 +261,9 @@ message TKqpReadRangesSourceSettings {
optional NKikimrProto.TRowVersion Snapshot = 10;
optional uint64 ShardIdHint = 11;
optional bool Sorted = 12;
+
+ optional uint64 LockTxId = 13;
+ optional uint32 LockNodeId = 14;
}
message TKqpTaskInfo {
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
index 8e45f4f40f..d7245f1632 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
@@ -87,6 +87,8 @@ struct IDqComputeActorAsyncInput {
return 0;
}
+ virtual TMaybe<google::protobuf::Any> ExtraData() { return {}; }
+
// The same signature as IActor::PassAway().
// It is guaranted that this method will be called with bound MKQL allocator.
// So, it is the right place to destroy all internal UnboxedValues.
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 49b3998213..3a8976f1ec 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -577,11 +577,31 @@ protected:
Terminate(success, TIssues({TIssue(message)}));
}
+ void FillExtraData(NDqProto::TEvComputeActorState& state) {
+ auto* extraData = state.MutableExtraData();
+ for (auto& [index, input] : SourcesMap) {
+ if (auto data = input.AsyncInput->ExtraData()) {
+ auto* entry = extraData->AddSourcesExtraData();
+ entry->SetIndex(index);
+ entry->MutableData()->CopyFrom(*data);
+ }
+ }
+ for (auto& [index, input] : SourcesMap) {
+ if (auto data = input.AsyncInput->ExtraData()) {
+ auto* entry = extraData->AddInputTransformsData();
+ entry->SetIndex(index);
+ entry->MutableData()->CopyFrom(*data);
+ }
+ }
+ }
+
void ReportStateAndMaybeDie(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues)
{
auto execEv = MakeHolder<TEvDqCompute::TEvState>();
auto& record = execEv->Record;
+ FillExtraData(record);
+
record.SetState(State);
record.SetStatusCode(statusCode);
record.SetTaskId(Task.GetId());
diff --git a/ydb/library/yql/dq/actors/protos/dq_events.proto b/ydb/library/yql/dq/actors/protos/dq_events.proto
index 3baf18902e..e4d0e201ca 100644
--- a/ydb/library/yql/dq/actors/protos/dq_events.proto
+++ b/ydb/library/yql/dq/actors/protos/dq_events.proto
@@ -1,6 +1,7 @@
package NYql.NDqProto;
option cc_enable_arenas = true;
+import "google/protobuf/any.proto";
import "library/cpp/actors/protos/actors.proto";
import "ydb/library/yql/dq/actors/protos/dq_stats.proto";
import "ydb/library/yql/dq/actors/protos/dq_status_codes.proto";
@@ -54,6 +55,16 @@ message TComputeActorStats {
reserved 2; //optional NKqpProto.TKqpStatsRun LegacyStats = 2;
};
+message TExtraInputData {
+ optional uint32 Index = 1;
+ optional google.protobuf.Any Data = 2;
+}
+
+message TComputeActorExtraData {
+ repeated TExtraInputData SourcesExtraData = 1;
+ repeated TExtraInputData InputTransformsData = 2;
+}
+
message TEvComputeActorState {
optional uint32 State = 1; // == EComputeState
optional TComputeActorStats LegacyStats = 2;
@@ -62,6 +73,7 @@ message TEvComputeActorState {
optional uint64 TaskId = 5;
optional TDqComputeActorStats Stats = 6;
optional NYql.NDqProto.StatusIds.StatusCode StatusCode = 7;
+ optional TComputeActorExtraData ExtraData = 8;
};
message TEvComputeStateRequest {