aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorponasenko-rs <ponasenko-rs@yandex-team.com>2024-04-03 16:40:17 +0300
committerponasenko-rs <ponasenko-rs@yandex-team.com>2024-04-03 16:54:50 +0300
commit8804fe4b72cdcc6c1c77564d15d7790c4cde512c (patch)
tree89395ed76a0d85358bc275124619ead8cb9eed78
parentbd6fa12b5d3bc14249eb09846c0c4e65fc12368c (diff)
downloadydb-8804fe4b72cdcc6c1c77564d15d7790c4cde512c.tar.gz
YT-20426: Support shared write locks on rpc proxy
3204feb9fdc0206822f63b61749f4622c4398afb
-rw-r--r--yt/yt/client/api/rpc_proxy/public.h1
-rw-r--r--yt/yt/client/api/rpc_proxy/transaction_impl.cpp23
-rw-r--r--yt/yt/client/table_client/public.h2
-rw-r--r--yt/yt/client/table_client/schema.cpp33
-rw-r--r--yt/yt/client/table_client/schema.h3
-rw-r--r--yt/yt/client/tablet_client/public.h6
-rw-r--r--yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto10
-rw-r--r--yt/yt_proto/yt/client/tablet_client/proto/lock_mask.proto13
-rw-r--r--yt/yt_proto/yt/client/ya.make2
9 files changed, 82 insertions, 11 deletions
diff --git a/yt/yt/client/api/rpc_proxy/public.h b/yt/yt/client/api/rpc_proxy/public.h
index 33f580a444..fd74de1fef 100644
--- a/yt/yt/client/api/rpc_proxy/public.h
+++ b/yt/yt/client/api/rpc_proxy/public.h
@@ -27,6 +27,7 @@ YT_DEFINE_ERROR_ENUM(
DEFINE_ENUM(ERpcProxyFeature,
((GetInSyncWithoutKeys)(0))
+ ((WideLocks) (1))
);
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
index 7089f413e1..28027efa25 100644
--- a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
@@ -396,14 +396,16 @@ void TTransaction::ModifyRows(
rows.reserve(modifications.Size());
bool usedStrongLocks = false;
+ bool usedWideLocks = false;
for (const auto& modification : modifications) {
auto mask = modification.Locks;
+ usedWideLocks |= mask.GetSize() > TLegacyLockMask::MaxCount;
+ if (usedWideLocks) {
+ break;
+ }
+
for (int index = 0; index < TLegacyLockMask::MaxCount; ++index) {
- if (mask.Get(index) > MaxOldLockType) {
- THROW_ERROR_EXCEPTION("New locks are not supported in RPC client yet")
- << TErrorAttribute("lock_index", index)
- << TErrorAttribute("lock_type", mask.Get(index));
- }
+ usedWideLocks |= mask.Get(index) > MaxOldLockType;
usedStrongLocks |= mask.Get(index) == ELockType::SharedStrong;
}
}
@@ -411,14 +413,19 @@ void TTransaction::ModifyRows(
if (usedStrongLocks) {
req->Header().set_protocol_version_minor(YTRpcModifyRowsStrongLocksVersion);
}
+ if (usedWideLocks) {
+ req->RequireServerFeature(ERpcProxyFeature::WideLocks);
+ }
for (const auto& modification : modifications) {
rows.emplace_back(modification.Row);
req->add_row_modification_types(static_cast<NProto::ERowModificationType>(modification.Type));
- if (usedStrongLocks) {
+ if (usedWideLocks) {
+ ToProto(req->add_row_locks(), modification.Locks);
+ } else if (usedStrongLocks) {
auto locks = modification.Locks;
YT_VERIFY(!locks.HasNewLocks());
- req->add_row_locks(locks.ToLegacyMask().GetBitmap());
+ req->add_row_legacy_locks(locks.ToLegacyMask().GetBitmap());
} else {
TLegacyLockBitmap bitmap = 0;
for (int index = 0; index < TLegacyLockMask::MaxCount; ++index) {
@@ -426,7 +433,7 @@ void TTransaction::ModifyRows(
bitmap |= 1u << index;
}
}
- req->add_row_read_locks(bitmap);
+ req->add_row_legacy_read_locks(bitmap);
}
}
diff --git a/yt/yt/client/table_client/public.h b/yt/yt/client/table_client/public.h
index 4de8d2f2b7..73b6c8dca6 100644
--- a/yt/yt/client/table_client/public.h
+++ b/yt/yt/client/table_client/public.h
@@ -4,6 +4,8 @@
#include <yt/yt/client/cypress_client/public.h>
+#include <yt/yt/client/tablet_client/public.h>
+
#include <yt/yt/client/transaction_client/public.h>
#include <yt/yt/core/misc/range.h>
diff --git a/yt/yt/client/table_client/schema.cpp b/yt/yt/client/table_client/schema.cpp
index da847dcc23..672cb8a520 100644
--- a/yt/yt/client/table_client/schema.cpp
+++ b/yt/yt/client/table_client/schema.cpp
@@ -22,6 +22,8 @@
#include <yt/yt_proto/yt/client/table_chunk_format/proto/chunk_meta.pb.h>
#include <yt/yt_proto/yt/client/table_chunk_format/proto/wire_protocol.pb.h>
+#include <yt/yt_proto/yt/client/tablet_client/proto/lock_mask.pb.h>
+
namespace NYT::NTableClient {
using namespace NChunkClient;
@@ -112,6 +114,37 @@ TLockMask MaxMask(TLockMask lhs, TLockMask rhs)
return lhs;
}
+void ToProto(NTabletClient::NProto::TLockMask* protoLockMask, const TLockMask& lockMask)
+{
+ auto size = lockMask.GetSize();
+ YT_VERIFY(size <= TLockMask::MaxSize);
+
+ protoLockMask->set_size(size);
+
+ const auto& bitmap = lockMask.GetBitmap();
+ auto wordCount = DivCeil(size, TLockMask::LocksPerWord);
+ YT_VERIFY(std::ssize(bitmap) >= wordCount);
+
+ protoLockMask->clear_bitmap();
+ for (int index = 0; index < wordCount; ++index) {
+ protoLockMask->add_bitmap(bitmap[index]);
+ }
+}
+
+void FromProto(TLockMask* lockMask, const NTabletClient::NProto::TLockMask& protoLockMask)
+{
+ auto size = protoLockMask.size();
+ auto wordCount = DivCeil<int>(size, TLockMask::LocksPerWord);
+
+ TLockBitmap bitmap;
+ bitmap.reserve(wordCount);
+ for (int index = 0; index < wordCount; ++index) {
+ bitmap.push_back(protoLockMask.bitmap(index));
+ }
+
+ *lockMask = TLockMask(bitmap, size);
+}
+
////////////////////////////////////////////////////////////////////////////////
TColumnSchema::TColumnSchema()
diff --git a/yt/yt/client/table_client/schema.h b/yt/yt/client/table_client/schema.h
index 91729bc8cc..a556a0637d 100644
--- a/yt/yt/client/table_client/schema.h
+++ b/yt/yt/client/table_client/schema.h
@@ -107,6 +107,9 @@ bool operator == (const TLockMask& lhs, const TLockMask& rhs);
TLockMask MaxMask(TLockMask lhs, TLockMask rhs);
+void ToProto(NTabletClient::NProto::TLockMask* protoLockMask, const TLockMask& lockMask);
+void FromProto(TLockMask* lockMask, const NTabletClient::NProto::TLockMask& protoLockMask);
+
////////////////////////////////////////////////////////////////////////////////
class TColumnSchema
diff --git a/yt/yt/client/tablet_client/public.h b/yt/yt/client/tablet_client/public.h
index 5d0fe07822..e2bc658175 100644
--- a/yt/yt/client/tablet_client/public.h
+++ b/yt/yt/client/tablet_client/public.h
@@ -10,6 +10,12 @@ namespace NYT::NTabletClient {
////////////////////////////////////////////////////////////////////////////////
+namespace NProto {
+
+class TLockMask;
+
+} // namespace NProto
+
DEFINE_ENUM(ETabletState,
// Individual states
((Mounting) (0))
diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
index 860672c48f..6e4ce5a93d 100644
--- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
+++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
@@ -10,9 +10,10 @@ import "yt_proto/yt/core/misc/proto/guid.proto";
import "yt_proto/yt/core/misc/proto/error.proto";
import "yt_proto/yt/core/ytree/proto/attributes.proto";
import "yt_proto/yt/core/ytree/proto/request_complexity_limits.proto";
-import "yt_proto/yt/client/hive/proto/timestamp_map.proto";
import "yt_proto/yt/client/chunk_client/proto/data_statistics.proto";
import "yt_proto/yt/client/chaos_client/proto/replication_card.proto";
+import "yt_proto/yt/client/hive/proto/timestamp_map.proto";
+import "yt_proto/yt/client/tablet_client/proto/lock_mask.proto";
////////////////////////////////////////////////////////////////////////////////
// Scalars
@@ -905,14 +906,17 @@ message TReqModifyRows
required string path = 2;
repeated ERowModificationType row_modification_types = 3;
// COMPAT(lukyan): Remove after RPC protocol version update
- repeated uint32 row_read_locks = 7;
- repeated uint64 row_locks = 8;
+ repeated uint32 row_legacy_read_locks = 7;
+ // COMPAT(ponasenko-rs): Remove after RPC protocol version update
+ repeated uint64 row_legacy_locks = 8;
optional bool require_sync_replica = 4;
optional NYT.NProto.TGuid upstream_replica_id = 5;
optional bool allow_missing_key_columns = 10;
+ repeated NYT.NTabletClient.NProto.TLockMask row_locks = 11;
+
required TRowsetDescriptor rowset_descriptor = 200;
}
diff --git a/yt/yt_proto/yt/client/tablet_client/proto/lock_mask.proto b/yt/yt_proto/yt/client/tablet_client/proto/lock_mask.proto
new file mode 100644
index 0000000000..55d74bf6a0
--- /dev/null
+++ b/yt/yt_proto/yt/client/tablet_client/proto/lock_mask.proto
@@ -0,0 +1,13 @@
+package NYT.NTabletClient.NProto;
+
+option go_package = "a.yandex-team.ru/yt/go/proto/client/tablet_client";
+
+////////////////////////////////////////////////////////////////////////////////
+
+message TLockMask
+{
+ repeated uint64 bitmap = 1;
+ required uint32 size = 2;
+}
+
+////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt_proto/yt/client/ya.make b/yt/yt_proto/yt/client/ya.make
index 8c742acce8..7552ce952d 100644
--- a/yt/yt_proto/yt/client/ya.make
+++ b/yt/yt_proto/yt/client/ya.make
@@ -38,6 +38,8 @@ SRCS(
table_chunk_format/proto/column_meta.proto
table_chunk_format/proto/wire_protocol.proto
+ tablet_client/proto/lock_mask.proto
+
transaction_client/proto/timestamp_service.proto
query_client/proto/query_statistics.proto