diff options
author | ulya-sidorina <yulia@ydb.tech> | 2023-01-26 16:37:49 +0300 |
---|---|---|
committer | ulya-sidorina <yulia@ydb.tech> | 2023-01-26 16:37:49 +0300 |
commit | eab9392c527647bbcce377bf3bb3d1cebdf4f70c (patch) | |
tree | 1b7afb8eb0218920900cad5379609fb47d198e63 | |
parent | 2e8c6e34414c6add3df070d18db938029606b968 (diff) | |
download | ydb-eab9392c527647bbcce377bf3bb3d1cebdf4f70c.tar.gz |
send broken locks from read actors to executer
fix(kqp): send broken locks to executer
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 16 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp | 39 | ||||
-rw-r--r-- | ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp | 14 | ||||
-rw-r--r-- | ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h | 2 |
4 files changed, 49 insertions, 22 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index 1fb2fa24e56..2c20e32a4b9 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -576,7 +576,7 @@ public: record.SetResultFormat(Settings.GetDataFormat()); - if (Settings.HasLockTxId()) { + if (Settings.HasLockTxId() && BrokenLocks.empty()) { record.SetLockTxId(Settings.GetLockTxId()); } @@ -603,11 +603,6 @@ public: return; } - if (record.BrokenTxLocksSize()) { - return RuntimeError("Broken locks", NYql::NDqProto::StatusIds::ABORTED, - {YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, "Transaction locks on " + Settings.GetTable().GetTablePath() + " invalidated.")}); - } - if (record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) { for (auto& issue : record.GetStatus().GetIssues()) { CA_LOG_D("read id #" << id << " got issue " << issue.Getmessage()); @@ -617,6 +612,11 @@ public: for (auto& lock : record.GetTxLocks()) { Locks.push_back(lock); } + + for (auto& lock : record.GetBrokenTxLocks()) { + BrokenLocks.push_back(lock); + } + CA_LOG_D("Taken " << Locks.size() << " locks"); Reads[id].SerializedContinuationToken = record.GetContinuationToken(); @@ -856,6 +856,9 @@ public: for (auto& lock : Locks) { resultInfo.AddLocks()->CopyFrom(lock); } + for (auto& lock : BrokenLocks) { + resultInfo.AddLocks()->CopyFrom(lock); + } result.PackFrom(resultInfo); return result; } @@ -893,6 +896,7 @@ private: TQueue<TResult> Results; TVector<NKikimrTxDataShard::TLock> Locks; + TVector<NKikimrTxDataShard::TLock> BrokenLocks; ui32 MaxInFlight = 1024; const TString LogPrefix; diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index f867e92ce5d..564c1b01591 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -10,6 +10,7 @@ #include <ydb/core/protos/kqp.pb.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> #include <ydb/core/tx/datashard/datashard.h> +#include <ydb/core/kqp/common/kqp_event_ids.h> namespace NKikimr { namespace NKqp { @@ -182,6 +183,11 @@ private: for (auto& lock : Locks) { resultInfo.AddLocks()->CopyFrom(lock); } + + for (auto& lock : BrokenLocks) { + resultInfo.AddLocks()->CopyFrom(lock); + } + result.PackFrom(resultInfo); return result; } @@ -245,16 +251,17 @@ private: return; } - if (record.BrokenTxLocksSize()) { - return RuntimeError("Transaction locks invalidated.", NYql::NDqProto::StatusIds::ABORTED); + for (auto& lock : record.GetBrokenTxLocks()) { + BrokenLocks.push_back(lock); + } + + for (auto& lock : record.GetTxLocks()) { + Locks.push_back(lock); } if (!Snapshot.IsValid() && !record.GetFinished()) { // HEAD read was converted to repeatable read Snapshot = IKqpGateway::TKqpSnapshot(record.GetSnapshot().GetStep(), record.GetSnapshot().GetTxId()); - } else if (Snapshot.IsValid()) { - YQL_ENSURE(record.GetSnapshot().GetStep() == Snapshot.Step && record.GetSnapshot().GetTxId() == Snapshot.TxId, - "Snapshot version mismatch"); } // TODO: refactor after KIKIMR-15102 @@ -267,10 +274,6 @@ private: return RetryTableRead(read, continuationToken); } - for (auto& lock : record.GetTxLocks()) { - Locks.push_back(lock); - } - YQL_ENSURE(record.GetResultFormat() == NKikimrTxDataShard::EScanDataFormat::CELLVEC); auto nrows = ev->Get()->GetRowsCount(); for (ui64 rowId = 0; rowId < nrows; ++rowId) { @@ -388,12 +391,21 @@ private: YQL_ENSURE(TableScheme); YQL_ENSURE(KeyPrefixColumns.size() <= TableScheme->KeyColumnTypes.size()); + TVector<i32> keyColumnOrder; + keyColumnOrder.reserve(KeyPrefixColumns.size()); + for (const auto& keyColumn : KeyPrefixColumns) { + auto it = TableScheme->ColumnsByName.find(keyColumn); + YQL_ENSURE(it != TableScheme->ColumnsByName.end()); + keyColumnOrder.push_back(it->second.KeyOrder); + } + NUdf::EFetchStatus status; NUdf::TUnboxedValue key; while ((status = Input.Fetch(key)) == NUdf::EFetchStatus::Ok) { std::vector<TCell> keyCells(KeyPrefixColumns.size()); for (ui32 colId = 0; colId < KeyPrefixColumns.size(); ++colId) { - keyCells[colId] = MakeCell(TableScheme->KeyColumnTypes[colId], key.GetElement(colId), TypeEnv, /* copy */ true); + keyCells[keyColumnOrder[colId]] = MakeCell(TableScheme->KeyColumnTypes[keyColumnOrder[colId]], + key.GetElement(colId), TypeEnv, /* copy */ true); } UnprocessedKeys.emplace_back(std::move(keyCells)); @@ -487,7 +499,7 @@ private: YQL_ENSURE(ImmediateTx, "HEAD reading is only available for immediate txs"); } - if (LockTxId) { + if (LockTxId && BrokenLocks.empty()) { record.SetLockTxId(*LockTxId); } @@ -523,7 +535,8 @@ private: void RetryTableRead(TReadState& failedRead, NKikimrTxDataShard::TReadContinuationToken& token) { YQL_ENSURE(token.GetFirstUnprocessedQuery() <= failedRead.Keys.size()); - std::vector<TOwnedTableRange> unprocessedKeys(failedRead.Keys.size() - token.GetFirstUnprocessedQuery()); + std::vector<TOwnedTableRange> unprocessedKeys; + unprocessedKeys.reserve(failedRead.Keys.size() - token.GetFirstUnprocessedQuery()); for (ui64 idx = token.GetFirstUnprocessedQuery(); idx < failedRead.Keys.size(); ++idx) { unprocessedKeys.emplace_back(std::move(failedRead.Keys[idx])); } @@ -626,8 +639,8 @@ private: const TDuration SchemeCacheRequestTimeout; NActors::TActorId SchemeCacheRequestTimeoutTimer; const TDuration RetryReadTimeout; - TVector<NKikimrTxDataShard::TLock> Locks; + TVector<NKikimrTxDataShard::TLock> BrokenLocks; }; } // namespace diff --git a/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp b/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp index 365d1ac69f7..d1559a7a353 100644 --- a/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp +++ b/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp @@ -62,7 +62,12 @@ Y_UNIT_TEST_SUITE(KqpSnapshotRead) { } Y_UNIT_TEST(ReadOnlyTxCommitsOnConcurrentWrite) { - TKikimrRunner kikimr(TKikimrSettings().SetEnableMvcc(true).SetEnableMvccSnapshotReads(true)); + TKikimrRunner kikimr( + TKikimrSettings() + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetEnableKqpDataQueryStreamLookup(true) + ); // kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::PRI_DEBUG); // kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_BLOBS_STORAGE, NActors::NLog::PRI_DEBUG); @@ -261,7 +266,12 @@ Y_UNIT_TEST_SUITE(KqpSnapshotRead) { } Y_UNIT_TEST(ReadWriteTxFailsOnConcurrentWrite3) { - TKikimrRunner kikimr(TKikimrSettings().SetEnableMvcc(true).SetEnableMvccSnapshotReads(true)); + TKikimrRunner kikimr( + TKikimrSettings() + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetEnableKqpDataQueryStreamLookup(true) + ); // kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::PRI_DEBUG); // kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_BLOBS_STORAGE, NActors::NLog::PRI_DEBUG); diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 3a8976f1eca..e51ae3ae14f 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -586,7 +586,7 @@ protected: entry->MutableData()->CopyFrom(*data); } } - for (auto& [index, input] : SourcesMap) { + for (auto& [index, input] : InputTransformsMap) { if (auto data = input.AsyncInput->ExtraData()) { auto* entry = extraData->AddInputTransformsData(); entry->SetIndex(index); |