aboutsummaryrefslogtreecommitdiffstats
path: root/yt
diff options
context:
space:
mode:
authorponasenko-rs <ponasenko-rs@yandex-team.com>2023-11-16 14:02:39 +0300
committerponasenko-rs <ponasenko-rs@yandex-team.com>2023-11-16 14:54:41 +0300
commitdfff32a40242f1c6be37ffc69620c5642473234c (patch)
tree5b96125984a127db54ca91cb408d8e71804a90de /yt
parenta2fd00a24fcb20211418b797cb6191c4f1afa6ea (diff)
downloadydb-dfff32a40242f1c6be37ffc69620c5642473234c.tar.gz
YT-19441: Add shared write locks.
Diffstat (limited to 'yt')
-rw-r--r--yt/yt/client/api/transaction.cpp36
-rw-r--r--yt/yt/client/api/transaction.h3
-rw-r--r--yt/yt/client/driver/table_commands.cpp6
-rw-r--r--yt/yt/client/driver/table_commands.h1
-rw-r--r--yt/yt/client/table_client/schema-inl.h10
-rw-r--r--yt/yt/client/table_client/schema.cpp28
-rw-r--r--yt/yt/client/table_client/schema.h7
-rw-r--r--yt/yt/client/table_client/unversioned_row.cpp48
-rw-r--r--yt/yt/client/table_client/unversioned_row.h10
-rw-r--r--yt/yt/client/tablet_client/table_mount_cache.h3
10 files changed, 146 insertions, 6 deletions
diff --git a/yt/yt/client/api/transaction.cpp b/yt/yt/client/api/transaction.cpp
index deec6dccd1..c6e2698af3 100644
--- a/yt/yt/client/api/transaction.cpp
+++ b/yt/yt/client/api/transaction.cpp
@@ -21,13 +21,43 @@ void ITransaction::WriteRows(
const NYPath::TYPath& path,
TNameTablePtr nameTable,
TSharedRange<TUnversionedRow> rows,
- const TModifyRowsOptions& options)
+ const TModifyRowsOptions& options,
+ ELockType lockType)
{
+ THROW_ERROR_EXCEPTION_IF(!IsWriteLock(lockType), "Inappropriate lock type %Qv given for write operation",
+ lockType);
+
std::vector<TRowModification> modifications;
modifications.reserve(rows.Size());
- for (auto row : rows) {
- modifications.push_back({ERowModificationType::Write, row.ToTypeErasedRow(), TLockMask()});
+ if (lockType == ELockType::Exclusive) {
+ for (auto row : rows) {
+ modifications.push_back({ERowModificationType::Write, row.ToTypeErasedRow(), TLockMask()});
+ }
+ } else {
+ // NB: This mount revision could differ from the one will be send to tablet node.
+ // However locks correctness will be checked in native transaction.
+ const auto& tableMountCache = GetClient()->GetTableMountCache();
+ auto tableInfo = WaitFor(tableMountCache->GetTableInfo(path))
+ .ValueOrThrow();
+
+ std::vector<int> columnIndexToLockIndex;
+ GetLocksMapping(
+ *tableInfo->Schemas[ETableSchemaKind::Write],
+ GetAtomicity() == NTransactionClient::EAtomicity::Full,
+ &columnIndexToLockIndex);
+
+ for (auto row : rows) {
+ TLockMask lockMask;
+ for (int index = 0; index < static_cast<int>(row.GetCount()); ++index) {
+ auto lockIndex = columnIndexToLockIndex[row[index].Id];
+ if (lockIndex != -1) {
+ lockMask.Set(lockIndex, lockType);
+ }
+ }
+
+ modifications.push_back({ERowModificationType::WriteAndLock, row.ToTypeErasedRow(), lockMask});
+ }
}
ModifyRows(
diff --git a/yt/yt/client/api/transaction.h b/yt/yt/client/api/transaction.h
index 2d62e5bbaa..ede9a31b06 100644
--- a/yt/yt/client/api/transaction.h
+++ b/yt/yt/client/api/transaction.h
@@ -193,7 +193,8 @@ struct ITransaction
const NYPath::TYPath& path,
NTableClient::TNameTablePtr nameTable,
TSharedRange<NTableClient::TUnversionedRow> rows,
- const TModifyRowsOptions& options = {});
+ const TModifyRowsOptions& options = {},
+ NTableClient::ELockType lockType = NTableClient::ELockType::Exclusive);
void WriteRows(
const NYPath::TYPath& path,
diff --git a/yt/yt/client/driver/table_commands.cpp b/yt/yt/client/driver/table_commands.cpp
index 1bd171b1ff..c9ac75001d 100644
--- a/yt/yt/client/driver/table_commands.cpp
+++ b/yt/yt/client/driver/table_commands.cpp
@@ -897,6 +897,9 @@ void TInsertRowsCommand::Register(TRegistrar registrar)
return command->Options.AllowMissingKeyColumns;
})
.Default(false);
+
+ registrar.Parameter("lock_type", &TThis::LockType)
+ .Default(ELockType::Exclusive);
}
void TInsertRowsCommand::DoExecute(ICommandContextPtr context)
@@ -944,7 +947,8 @@ void TInsertRowsCommand::DoExecute(ICommandContextPtr context)
Path.GetPath(),
valueConsumer.GetNameTable(),
std::move(rowRange),
- Options);
+ Options,
+ LockType);
if (ShouldCommitTransaction()) {
WaitFor(transaction->Commit())
diff --git a/yt/yt/client/driver/table_commands.h b/yt/yt/client/driver/table_commands.h
index 1b6f377ed8..4587063dff 100644
--- a/yt/yt/client/driver/table_commands.h
+++ b/yt/yt/client/driver/table_commands.h
@@ -346,6 +346,7 @@ private:
NYPath::TRichYPath Path;
bool Update;
bool Aggregate;
+ NTableClient::ELockType LockType;
void DoExecute(ICommandContextPtr context) override;
};
diff --git a/yt/yt/client/table_client/schema-inl.h b/yt/yt/client/table_client/schema-inl.h
index 7e9c1590c1..364ff584c0 100644
--- a/yt/yt/client/table_client/schema-inl.h
+++ b/yt/yt/client/table_client/schema-inl.h
@@ -131,6 +131,16 @@ inline bool TLockMask::HasNewLocks() const
return false;
}
+inline bool TLockMask::IsNone() const {
+ for (int index = 0; index < GetSize(); ++index) {
+ if (Get(index) != ELockType::None) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
inline void TLockMask::Reserve(int size)
{
YT_VERIFY(size < MaxSize);
diff --git a/yt/yt/client/table_client/schema.cpp b/yt/yt/client/table_client/schema.cpp
index 7e77f94a61..80b3193a2b 100644
--- a/yt/yt/client/table_client/schema.cpp
+++ b/yt/yt/client/table_client/schema.cpp
@@ -46,8 +46,10 @@ int GetLockPriority(ELockType lockType)
return 1;
case ELockType::SharedStrong:
return 2;
- case ELockType::Exclusive:
+ case ELockType::SharedWrite:
return 3;
+ case ELockType::Exclusive:
+ return 4;
default:
YT_ABORT();
}
@@ -57,8 +59,32 @@ int GetLockPriority(ELockType lockType)
////////////////////////////////////////////////////////////////////////////////
+bool IsReadLock(ELockType lock)
+{
+ return lock == ELockType::SharedWeak || lock == ELockType::SharedStrong;
+}
+
+bool IsWriteLock(ELockType lock)
+{
+ return lock == ELockType::Exclusive || lock == ELockType::SharedWrite;
+}
+
ELockType GetStrongestLock(ELockType lhs, ELockType rhs)
{
+ if (lhs == ELockType::None) {
+ return rhs;
+ }
+
+ if (rhs == ELockType::None) {
+ return lhs;
+ }
+
+ if (IsReadLock(lhs) && IsWriteLock(rhs) ||
+ IsReadLock(rhs) && IsWriteLock(lhs))
+ {
+ return ELockType::Exclusive;
+ }
+
return GetLockPriority(lhs) > GetLockPriority(rhs) ? lhs : rhs;
}
diff --git a/yt/yt/client/table_client/schema.h b/yt/yt/client/table_client/schema.h
index f64484df29..da539ddcb7 100644
--- a/yt/yt/client/table_client/schema.h
+++ b/yt/yt/client/table_client/schema.h
@@ -23,11 +23,15 @@ DEFINE_ENUM(ELockType,
((SharedWeak) (1))
((SharedStrong) (2))
((Exclusive) (3))
+ ((SharedWrite) (4))
);
// COMPAT(gritukan)
constexpr ELockType MaxOldLockType = ELockType::Exclusive;
+bool IsReadLock(ELockType lock);
+bool IsWriteLock(ELockType lock);
+
ELockType GetStrongestLock(ELockType lhs, ELockType rhs);
////////////////////////////////////////////////////////////////////////////////
@@ -76,6 +80,9 @@ public:
TLegacyLockMask ToLegacyMask() const;
bool HasNewLocks() const;
+ // NB: Has linear complexity.
+ bool IsNone() const;
+
static constexpr int BitsPerType = 4;
static_assert(static_cast<int>(TEnumTraits<ELockType>::GetMaxValue()) < (1 << BitsPerType));
diff --git a/yt/yt/client/table_client/unversioned_row.cpp b/yt/yt/client/table_client/unversioned_row.cpp
index 07062c2690..874c175d4d 100644
--- a/yt/yt/client/table_client/unversioned_row.cpp
+++ b/yt/yt/client/table_client/unversioned_row.cpp
@@ -1237,6 +1237,54 @@ void ValidateDuplicateAndRequiredValueColumns(
}
}
+bool ValidateNonKeyColumnsAgainstLock(
+ TUnversionedRow row,
+ const TLockMask& locks,
+ const TTableSchema& schema,
+ const TNameTableToSchemaIdMapping& idMapping,
+ const TNameTablePtr nameTable,
+ const std::vector<int>& columnIndexToLockIndex,
+ bool allowSharedWriteLocks)
+{
+ bool hasNonKeyColumns = false;
+ for (const auto value : row) {
+ int mappedId = ApplyIdMapping(value, &idMapping);
+ if (mappedId < 0 || mappedId >= std::ssize(schema.Columns())) {
+ int size = nameTable->GetSize();
+ if (value.Id < 0 || value.Id >= size) {
+ THROW_ERROR_EXCEPTION("Expected value id in range [0:%v] but got %v",
+ size - 1,
+ value.Id);
+ }
+
+ THROW_ERROR_EXCEPTION("Unexpected column %Qv",
+ nameTable->GetName(value.Id));
+ }
+
+ auto lockIndex = columnIndexToLockIndex[mappedId];
+ if (lockIndex == -1) {
+ continue;
+ }
+
+ auto lockType = locks.Get(lockIndex);
+
+ if (lockType == ELockType::SharedWrite && !allowSharedWriteLocks) {
+ THROW_ERROR_EXCEPTION("Shared write locks are not allowed for the table");
+ }
+
+ if (mappedId >= schema.GetKeyColumnCount()) {
+ hasNonKeyColumns = true;
+
+ if (lockType != ELockType::Exclusive && lockType != ELockType::SharedWrite) {
+ THROW_ERROR_EXCEPTION("No write lock taken for column %Qv",
+ nameTable->GetName(value.Id));
+ }
+ }
+ }
+
+ return hasNonKeyColumns;
+}
+
void ValidateClientKey(TLegacyKey key)
{
for (const auto& value : key) {
diff --git a/yt/yt/client/table_client/unversioned_row.h b/yt/yt/client/table_client/unversioned_row.h
index 7557836241..ccbb73e03f 100644
--- a/yt/yt/client/table_client/unversioned_row.h
+++ b/yt/yt/client/table_client/unversioned_row.h
@@ -444,6 +444,16 @@ void ValidateDuplicateAndRequiredValueColumns(
const TNameTableToSchemaIdMapping& idMapping,
std::vector<bool>* columnPresenceBuffer);
+//! Checks that #row contains write lock for non-key columns and returns true if any non-key columns encountered.
+bool ValidateNonKeyColumnsAgainstLock(
+ TUnversionedRow row,
+ const TLockMask& locks,
+ const TTableSchema& schema,
+ const TNameTableToSchemaIdMapping& idMapping,
+ const TNameTablePtr nameTable,
+ const std::vector<int>& columnIndexToLockIndex,
+ bool allowSharedWriteLocks);
+
//! Checks that #key is a valid client-side key. Throws on failure.
/*! The components must pass #ValidateKeyValue check. */
void ValidateClientKey(TLegacyKey key);
diff --git a/yt/yt/client/tablet_client/table_mount_cache.h b/yt/yt/client/tablet_client/table_mount_cache.h
index 44fa398800..bba0a1e0c0 100644
--- a/yt/yt/client/tablet_client/table_mount_cache.h
+++ b/yt/yt/client/tablet_client/table_mount_cache.h
@@ -128,6 +128,9 @@ struct TTableMountInfo
bool EnableDetailedProfiling = false;
+ // COMPAT(ponasenko-rs)
+ bool EnableSharedWriteLocks = false;
+
bool IsSorted() const;
bool IsOrdered() const;
bool IsReplicated() const;