diff options
author | ponasenko-rs <ponasenko-rs@yandex-team.com> | 2023-11-16 14:02:39 +0300 |
---|---|---|
committer | ponasenko-rs <ponasenko-rs@yandex-team.com> | 2023-11-16 14:54:41 +0300 |
commit | dfff32a40242f1c6be37ffc69620c5642473234c (patch) | |
tree | 5b96125984a127db54ca91cb408d8e71804a90de /yt | |
parent | a2fd00a24fcb20211418b797cb6191c4f1afa6ea (diff) | |
download | ydb-dfff32a40242f1c6be37ffc69620c5642473234c.tar.gz |
YT-19441: Add shared write locks.
Diffstat (limited to 'yt')
-rw-r--r-- | yt/yt/client/api/transaction.cpp | 36 | ||||
-rw-r--r-- | yt/yt/client/api/transaction.h | 3 | ||||
-rw-r--r-- | yt/yt/client/driver/table_commands.cpp | 6 | ||||
-rw-r--r-- | yt/yt/client/driver/table_commands.h | 1 | ||||
-rw-r--r-- | yt/yt/client/table_client/schema-inl.h | 10 | ||||
-rw-r--r-- | yt/yt/client/table_client/schema.cpp | 28 | ||||
-rw-r--r-- | yt/yt/client/table_client/schema.h | 7 | ||||
-rw-r--r-- | yt/yt/client/table_client/unversioned_row.cpp | 48 | ||||
-rw-r--r-- | yt/yt/client/table_client/unversioned_row.h | 10 | ||||
-rw-r--r-- | yt/yt/client/tablet_client/table_mount_cache.h | 3 |
10 files changed, 146 insertions, 6 deletions
diff --git a/yt/yt/client/api/transaction.cpp b/yt/yt/client/api/transaction.cpp index deec6dccd1..c6e2698af3 100644 --- a/yt/yt/client/api/transaction.cpp +++ b/yt/yt/client/api/transaction.cpp @@ -21,13 +21,43 @@ void ITransaction::WriteRows( const NYPath::TYPath& path, TNameTablePtr nameTable, TSharedRange<TUnversionedRow> rows, - const TModifyRowsOptions& options) + const TModifyRowsOptions& options, + ELockType lockType) { + THROW_ERROR_EXCEPTION_IF(!IsWriteLock(lockType), "Inappropriate lock type %Qv given for write operation", + lockType); + std::vector<TRowModification> modifications; modifications.reserve(rows.Size()); - for (auto row : rows) { - modifications.push_back({ERowModificationType::Write, row.ToTypeErasedRow(), TLockMask()}); + if (lockType == ELockType::Exclusive) { + for (auto row : rows) { + modifications.push_back({ERowModificationType::Write, row.ToTypeErasedRow(), TLockMask()}); + } + } else { + // NB: This mount revision could differ from the one will be send to tablet node. + // However locks correctness will be checked in native transaction. + const auto& tableMountCache = GetClient()->GetTableMountCache(); + auto tableInfo = WaitFor(tableMountCache->GetTableInfo(path)) + .ValueOrThrow(); + + std::vector<int> columnIndexToLockIndex; + GetLocksMapping( + *tableInfo->Schemas[ETableSchemaKind::Write], + GetAtomicity() == NTransactionClient::EAtomicity::Full, + &columnIndexToLockIndex); + + for (auto row : rows) { + TLockMask lockMask; + for (int index = 0; index < static_cast<int>(row.GetCount()); ++index) { + auto lockIndex = columnIndexToLockIndex[row[index].Id]; + if (lockIndex != -1) { + lockMask.Set(lockIndex, lockType); + } + } + + modifications.push_back({ERowModificationType::WriteAndLock, row.ToTypeErasedRow(), lockMask}); + } } ModifyRows( diff --git a/yt/yt/client/api/transaction.h b/yt/yt/client/api/transaction.h index 2d62e5bbaa..ede9a31b06 100644 --- a/yt/yt/client/api/transaction.h +++ b/yt/yt/client/api/transaction.h @@ -193,7 +193,8 @@ struct ITransaction const NYPath::TYPath& path, NTableClient::TNameTablePtr nameTable, TSharedRange<NTableClient::TUnversionedRow> rows, - const TModifyRowsOptions& options = {}); + const TModifyRowsOptions& options = {}, + NTableClient::ELockType lockType = NTableClient::ELockType::Exclusive); void WriteRows( const NYPath::TYPath& path, diff --git a/yt/yt/client/driver/table_commands.cpp b/yt/yt/client/driver/table_commands.cpp index 1bd171b1ff..c9ac75001d 100644 --- a/yt/yt/client/driver/table_commands.cpp +++ b/yt/yt/client/driver/table_commands.cpp @@ -897,6 +897,9 @@ void TInsertRowsCommand::Register(TRegistrar registrar) return command->Options.AllowMissingKeyColumns; }) .Default(false); + + registrar.Parameter("lock_type", &TThis::LockType) + .Default(ELockType::Exclusive); } void TInsertRowsCommand::DoExecute(ICommandContextPtr context) @@ -944,7 +947,8 @@ void TInsertRowsCommand::DoExecute(ICommandContextPtr context) Path.GetPath(), valueConsumer.GetNameTable(), std::move(rowRange), - Options); + Options, + LockType); if (ShouldCommitTransaction()) { WaitFor(transaction->Commit()) diff --git a/yt/yt/client/driver/table_commands.h b/yt/yt/client/driver/table_commands.h index 1b6f377ed8..4587063dff 100644 --- a/yt/yt/client/driver/table_commands.h +++ b/yt/yt/client/driver/table_commands.h @@ -346,6 +346,7 @@ private: NYPath::TRichYPath Path; bool Update; bool Aggregate; + NTableClient::ELockType LockType; void DoExecute(ICommandContextPtr context) override; }; diff --git a/yt/yt/client/table_client/schema-inl.h b/yt/yt/client/table_client/schema-inl.h index 7e9c1590c1..364ff584c0 100644 --- a/yt/yt/client/table_client/schema-inl.h +++ b/yt/yt/client/table_client/schema-inl.h @@ -131,6 +131,16 @@ inline bool TLockMask::HasNewLocks() const return false; } +inline bool TLockMask::IsNone() const { + for (int index = 0; index < GetSize(); ++index) { + if (Get(index) != ELockType::None) { + return false; + } + } + + return true; +} + inline void TLockMask::Reserve(int size) { YT_VERIFY(size < MaxSize); diff --git a/yt/yt/client/table_client/schema.cpp b/yt/yt/client/table_client/schema.cpp index 7e77f94a61..80b3193a2b 100644 --- a/yt/yt/client/table_client/schema.cpp +++ b/yt/yt/client/table_client/schema.cpp @@ -46,8 +46,10 @@ int GetLockPriority(ELockType lockType) return 1; case ELockType::SharedStrong: return 2; - case ELockType::Exclusive: + case ELockType::SharedWrite: return 3; + case ELockType::Exclusive: + return 4; default: YT_ABORT(); } @@ -57,8 +59,32 @@ int GetLockPriority(ELockType lockType) //////////////////////////////////////////////////////////////////////////////// +bool IsReadLock(ELockType lock) +{ + return lock == ELockType::SharedWeak || lock == ELockType::SharedStrong; +} + +bool IsWriteLock(ELockType lock) +{ + return lock == ELockType::Exclusive || lock == ELockType::SharedWrite; +} + ELockType GetStrongestLock(ELockType lhs, ELockType rhs) { + if (lhs == ELockType::None) { + return rhs; + } + + if (rhs == ELockType::None) { + return lhs; + } + + if (IsReadLock(lhs) && IsWriteLock(rhs) || + IsReadLock(rhs) && IsWriteLock(lhs)) + { + return ELockType::Exclusive; + } + return GetLockPriority(lhs) > GetLockPriority(rhs) ? lhs : rhs; } diff --git a/yt/yt/client/table_client/schema.h b/yt/yt/client/table_client/schema.h index f64484df29..da539ddcb7 100644 --- a/yt/yt/client/table_client/schema.h +++ b/yt/yt/client/table_client/schema.h @@ -23,11 +23,15 @@ DEFINE_ENUM(ELockType, ((SharedWeak) (1)) ((SharedStrong) (2)) ((Exclusive) (3)) + ((SharedWrite) (4)) ); // COMPAT(gritukan) constexpr ELockType MaxOldLockType = ELockType::Exclusive; +bool IsReadLock(ELockType lock); +bool IsWriteLock(ELockType lock); + ELockType GetStrongestLock(ELockType lhs, ELockType rhs); //////////////////////////////////////////////////////////////////////////////// @@ -76,6 +80,9 @@ public: TLegacyLockMask ToLegacyMask() const; bool HasNewLocks() const; + // NB: Has linear complexity. + bool IsNone() const; + static constexpr int BitsPerType = 4; static_assert(static_cast<int>(TEnumTraits<ELockType>::GetMaxValue()) < (1 << BitsPerType)); diff --git a/yt/yt/client/table_client/unversioned_row.cpp b/yt/yt/client/table_client/unversioned_row.cpp index 07062c2690..874c175d4d 100644 --- a/yt/yt/client/table_client/unversioned_row.cpp +++ b/yt/yt/client/table_client/unversioned_row.cpp @@ -1237,6 +1237,54 @@ void ValidateDuplicateAndRequiredValueColumns( } } +bool ValidateNonKeyColumnsAgainstLock( + TUnversionedRow row, + const TLockMask& locks, + const TTableSchema& schema, + const TNameTableToSchemaIdMapping& idMapping, + const TNameTablePtr nameTable, + const std::vector<int>& columnIndexToLockIndex, + bool allowSharedWriteLocks) +{ + bool hasNonKeyColumns = false; + for (const auto value : row) { + int mappedId = ApplyIdMapping(value, &idMapping); + if (mappedId < 0 || mappedId >= std::ssize(schema.Columns())) { + int size = nameTable->GetSize(); + if (value.Id < 0 || value.Id >= size) { + THROW_ERROR_EXCEPTION("Expected value id in range [0:%v] but got %v", + size - 1, + value.Id); + } + + THROW_ERROR_EXCEPTION("Unexpected column %Qv", + nameTable->GetName(value.Id)); + } + + auto lockIndex = columnIndexToLockIndex[mappedId]; + if (lockIndex == -1) { + continue; + } + + auto lockType = locks.Get(lockIndex); + + if (lockType == ELockType::SharedWrite && !allowSharedWriteLocks) { + THROW_ERROR_EXCEPTION("Shared write locks are not allowed for the table"); + } + + if (mappedId >= schema.GetKeyColumnCount()) { + hasNonKeyColumns = true; + + if (lockType != ELockType::Exclusive && lockType != ELockType::SharedWrite) { + THROW_ERROR_EXCEPTION("No write lock taken for column %Qv", + nameTable->GetName(value.Id)); + } + } + } + + return hasNonKeyColumns; +} + void ValidateClientKey(TLegacyKey key) { for (const auto& value : key) { diff --git a/yt/yt/client/table_client/unversioned_row.h b/yt/yt/client/table_client/unversioned_row.h index 7557836241..ccbb73e03f 100644 --- a/yt/yt/client/table_client/unversioned_row.h +++ b/yt/yt/client/table_client/unversioned_row.h @@ -444,6 +444,16 @@ void ValidateDuplicateAndRequiredValueColumns( const TNameTableToSchemaIdMapping& idMapping, std::vector<bool>* columnPresenceBuffer); +//! Checks that #row contains write lock for non-key columns and returns true if any non-key columns encountered. +bool ValidateNonKeyColumnsAgainstLock( + TUnversionedRow row, + const TLockMask& locks, + const TTableSchema& schema, + const TNameTableToSchemaIdMapping& idMapping, + const TNameTablePtr nameTable, + const std::vector<int>& columnIndexToLockIndex, + bool allowSharedWriteLocks); + //! Checks that #key is a valid client-side key. Throws on failure. /*! The components must pass #ValidateKeyValue check. */ void ValidateClientKey(TLegacyKey key); diff --git a/yt/yt/client/tablet_client/table_mount_cache.h b/yt/yt/client/tablet_client/table_mount_cache.h index 44fa398800..bba0a1e0c0 100644 --- a/yt/yt/client/tablet_client/table_mount_cache.h +++ b/yt/yt/client/tablet_client/table_mount_cache.h @@ -128,6 +128,9 @@ struct TTableMountInfo bool EnableDetailedProfiling = false; + // COMPAT(ponasenko-rs) + bool EnableSharedWriteLocks = false; + bool IsSorted() const; bool IsOrdered() const; bool IsReplicated() const; |