aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorulya-sidorina <yulia@ydb.tech>2023-01-26 16:37:49 +0300
committerulya-sidorina <yulia@ydb.tech>2023-01-26 16:37:49 +0300
commiteab9392c527647bbcce377bf3bb3d1cebdf4f70c (patch)
tree1b7afb8eb0218920900cad5379609fb47d198e63
parent2e8c6e34414c6add3df070d18db938029606b968 (diff)
downloadydb-eab9392c527647bbcce377bf3bb3d1cebdf4f70c.tar.gz
send broken locks from read actors to executer
fix(kqp): send broken locks to executer
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp16
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp39
-rw-r--r--ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp14
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h2
4 files changed, 49 insertions, 22 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index 1fb2fa24e56..2c20e32a4b9 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -576,7 +576,7 @@ public:
record.SetResultFormat(Settings.GetDataFormat());
- if (Settings.HasLockTxId()) {
+ if (Settings.HasLockTxId() && BrokenLocks.empty()) {
record.SetLockTxId(Settings.GetLockTxId());
}
@@ -603,11 +603,6 @@ public:
return;
}
- if (record.BrokenTxLocksSize()) {
- return RuntimeError("Broken locks", NYql::NDqProto::StatusIds::ABORTED,
- {YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, "Transaction locks on " + Settings.GetTable().GetTablePath() + " invalidated.")});
- }
-
if (record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) {
for (auto& issue : record.GetStatus().GetIssues()) {
CA_LOG_D("read id #" << id << " got issue " << issue.Getmessage());
@@ -617,6 +612,11 @@ public:
for (auto& lock : record.GetTxLocks()) {
Locks.push_back(lock);
}
+
+ for (auto& lock : record.GetBrokenTxLocks()) {
+ BrokenLocks.push_back(lock);
+ }
+
CA_LOG_D("Taken " << Locks.size() << " locks");
Reads[id].SerializedContinuationToken = record.GetContinuationToken();
@@ -856,6 +856,9 @@ public:
for (auto& lock : Locks) {
resultInfo.AddLocks()->CopyFrom(lock);
}
+ for (auto& lock : BrokenLocks) {
+ resultInfo.AddLocks()->CopyFrom(lock);
+ }
result.PackFrom(resultInfo);
return result;
}
@@ -893,6 +896,7 @@ private:
TQueue<TResult> Results;
TVector<NKikimrTxDataShard::TLock> Locks;
+ TVector<NKikimrTxDataShard::TLock> BrokenLocks;
ui32 MaxInFlight = 1024;
const TString LogPrefix;
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
index f867e92ce5d..564c1b01591 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
@@ -10,6 +10,7 @@
#include <ydb/core/protos/kqp.pb.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/core/tx/datashard/datashard.h>
+#include <ydb/core/kqp/common/kqp_event_ids.h>
namespace NKikimr {
namespace NKqp {
@@ -182,6 +183,11 @@ private:
for (auto& lock : Locks) {
resultInfo.AddLocks()->CopyFrom(lock);
}
+
+ for (auto& lock : BrokenLocks) {
+ resultInfo.AddLocks()->CopyFrom(lock);
+ }
+
result.PackFrom(resultInfo);
return result;
}
@@ -245,16 +251,17 @@ private:
return;
}
- if (record.BrokenTxLocksSize()) {
- return RuntimeError("Transaction locks invalidated.", NYql::NDqProto::StatusIds::ABORTED);
+ for (auto& lock : record.GetBrokenTxLocks()) {
+ BrokenLocks.push_back(lock);
+ }
+
+ for (auto& lock : record.GetTxLocks()) {
+ Locks.push_back(lock);
}
if (!Snapshot.IsValid() && !record.GetFinished()) {
// HEAD read was converted to repeatable read
Snapshot = IKqpGateway::TKqpSnapshot(record.GetSnapshot().GetStep(), record.GetSnapshot().GetTxId());
- } else if (Snapshot.IsValid()) {
- YQL_ENSURE(record.GetSnapshot().GetStep() == Snapshot.Step && record.GetSnapshot().GetTxId() == Snapshot.TxId,
- "Snapshot version mismatch");
}
// TODO: refactor after KIKIMR-15102
@@ -267,10 +274,6 @@ private:
return RetryTableRead(read, continuationToken);
}
- for (auto& lock : record.GetTxLocks()) {
- Locks.push_back(lock);
- }
-
YQL_ENSURE(record.GetResultFormat() == NKikimrTxDataShard::EScanDataFormat::CELLVEC);
auto nrows = ev->Get()->GetRowsCount();
for (ui64 rowId = 0; rowId < nrows; ++rowId) {
@@ -388,12 +391,21 @@ private:
YQL_ENSURE(TableScheme);
YQL_ENSURE(KeyPrefixColumns.size() <= TableScheme->KeyColumnTypes.size());
+ TVector<i32> keyColumnOrder;
+ keyColumnOrder.reserve(KeyPrefixColumns.size());
+ for (const auto& keyColumn : KeyPrefixColumns) {
+ auto it = TableScheme->ColumnsByName.find(keyColumn);
+ YQL_ENSURE(it != TableScheme->ColumnsByName.end());
+ keyColumnOrder.push_back(it->second.KeyOrder);
+ }
+
NUdf::EFetchStatus status;
NUdf::TUnboxedValue key;
while ((status = Input.Fetch(key)) == NUdf::EFetchStatus::Ok) {
std::vector<TCell> keyCells(KeyPrefixColumns.size());
for (ui32 colId = 0; colId < KeyPrefixColumns.size(); ++colId) {
- keyCells[colId] = MakeCell(TableScheme->KeyColumnTypes[colId], key.GetElement(colId), TypeEnv, /* copy */ true);
+ keyCells[keyColumnOrder[colId]] = MakeCell(TableScheme->KeyColumnTypes[keyColumnOrder[colId]],
+ key.GetElement(colId), TypeEnv, /* copy */ true);
}
UnprocessedKeys.emplace_back(std::move(keyCells));
@@ -487,7 +499,7 @@ private:
YQL_ENSURE(ImmediateTx, "HEAD reading is only available for immediate txs");
}
- if (LockTxId) {
+ if (LockTxId && BrokenLocks.empty()) {
record.SetLockTxId(*LockTxId);
}
@@ -523,7 +535,8 @@ private:
void RetryTableRead(TReadState& failedRead, NKikimrTxDataShard::TReadContinuationToken& token) {
YQL_ENSURE(token.GetFirstUnprocessedQuery() <= failedRead.Keys.size());
- std::vector<TOwnedTableRange> unprocessedKeys(failedRead.Keys.size() - token.GetFirstUnprocessedQuery());
+ std::vector<TOwnedTableRange> unprocessedKeys;
+ unprocessedKeys.reserve(failedRead.Keys.size() - token.GetFirstUnprocessedQuery());
for (ui64 idx = token.GetFirstUnprocessedQuery(); idx < failedRead.Keys.size(); ++idx) {
unprocessedKeys.emplace_back(std::move(failedRead.Keys[idx]));
}
@@ -626,8 +639,8 @@ private:
const TDuration SchemeCacheRequestTimeout;
NActors::TActorId SchemeCacheRequestTimeoutTimer;
const TDuration RetryReadTimeout;
-
TVector<NKikimrTxDataShard::TLock> Locks;
+ TVector<NKikimrTxDataShard::TLock> BrokenLocks;
};
} // namespace
diff --git a/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp b/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp
index 365d1ac69f7..d1559a7a353 100644
--- a/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp
+++ b/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp
@@ -62,7 +62,12 @@ Y_UNIT_TEST_SUITE(KqpSnapshotRead) {
}
Y_UNIT_TEST(ReadOnlyTxCommitsOnConcurrentWrite) {
- TKikimrRunner kikimr(TKikimrSettings().SetEnableMvcc(true).SetEnableMvccSnapshotReads(true));
+ TKikimrRunner kikimr(
+ TKikimrSettings()
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetEnableKqpDataQueryStreamLookup(true)
+ );
// kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::PRI_DEBUG);
// kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_BLOBS_STORAGE, NActors::NLog::PRI_DEBUG);
@@ -261,7 +266,12 @@ Y_UNIT_TEST_SUITE(KqpSnapshotRead) {
}
Y_UNIT_TEST(ReadWriteTxFailsOnConcurrentWrite3) {
- TKikimrRunner kikimr(TKikimrSettings().SetEnableMvcc(true).SetEnableMvccSnapshotReads(true));
+ TKikimrRunner kikimr(
+ TKikimrSettings()
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetEnableKqpDataQueryStreamLookup(true)
+ );
// kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::PRI_DEBUG);
// kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_BLOBS_STORAGE, NActors::NLog::PRI_DEBUG);
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 3a8976f1eca..e51ae3ae14f 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -586,7 +586,7 @@ protected:
entry->MutableData()->CopyFrom(*data);
}
}
- for (auto& [index, input] : SourcesMap) {
+ for (auto& [index, input] : InputTransformsMap) {
if (auto data = input.AsyncInput->ExtraData()) {
auto* entry = extraData->AddInputTransformsData();
entry->SetIndex(index);