diff options
author | ponasenko-rs <ponasenko-rs@yandex-team.com> | 2024-04-03 16:40:17 +0300 |
---|---|---|
committer | ponasenko-rs <ponasenko-rs@yandex-team.com> | 2024-04-03 16:54:50 +0300 |
commit | 8804fe4b72cdcc6c1c77564d15d7790c4cde512c (patch) | |
tree | 89395ed76a0d85358bc275124619ead8cb9eed78 | |
parent | bd6fa12b5d3bc14249eb09846c0c4e65fc12368c (diff) | |
download | ydb-8804fe4b72cdcc6c1c77564d15d7790c4cde512c.tar.gz |
YT-20426: Support shared write locks on rpc proxy
3204feb9fdc0206822f63b61749f4622c4398afb
-rw-r--r-- | yt/yt/client/api/rpc_proxy/public.h | 1 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/transaction_impl.cpp | 23 | ||||
-rw-r--r-- | yt/yt/client/table_client/public.h | 2 | ||||
-rw-r--r-- | yt/yt/client/table_client/schema.cpp | 33 | ||||
-rw-r--r-- | yt/yt/client/table_client/schema.h | 3 | ||||
-rw-r--r-- | yt/yt/client/tablet_client/public.h | 6 | ||||
-rw-r--r-- | yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto | 10 | ||||
-rw-r--r-- | yt/yt_proto/yt/client/tablet_client/proto/lock_mask.proto | 13 | ||||
-rw-r--r-- | yt/yt_proto/yt/client/ya.make | 2 |
9 files changed, 82 insertions, 11 deletions
diff --git a/yt/yt/client/api/rpc_proxy/public.h b/yt/yt/client/api/rpc_proxy/public.h index 33f580a444..fd74de1fef 100644 --- a/yt/yt/client/api/rpc_proxy/public.h +++ b/yt/yt/client/api/rpc_proxy/public.h @@ -27,6 +27,7 @@ YT_DEFINE_ERROR_ENUM( DEFINE_ENUM(ERpcProxyFeature, ((GetInSyncWithoutKeys)(0)) + ((WideLocks) (1)) ); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp index 7089f413e1..28027efa25 100644 --- a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp @@ -396,14 +396,16 @@ void TTransaction::ModifyRows( rows.reserve(modifications.Size()); bool usedStrongLocks = false; + bool usedWideLocks = false; for (const auto& modification : modifications) { auto mask = modification.Locks; + usedWideLocks |= mask.GetSize() > TLegacyLockMask::MaxCount; + if (usedWideLocks) { + break; + } + for (int index = 0; index < TLegacyLockMask::MaxCount; ++index) { - if (mask.Get(index) > MaxOldLockType) { - THROW_ERROR_EXCEPTION("New locks are not supported in RPC client yet") - << TErrorAttribute("lock_index", index) - << TErrorAttribute("lock_type", mask.Get(index)); - } + usedWideLocks |= mask.Get(index) > MaxOldLockType; usedStrongLocks |= mask.Get(index) == ELockType::SharedStrong; } } @@ -411,14 +413,19 @@ void TTransaction::ModifyRows( if (usedStrongLocks) { req->Header().set_protocol_version_minor(YTRpcModifyRowsStrongLocksVersion); } + if (usedWideLocks) { + req->RequireServerFeature(ERpcProxyFeature::WideLocks); + } for (const auto& modification : modifications) { rows.emplace_back(modification.Row); req->add_row_modification_types(static_cast<NProto::ERowModificationType>(modification.Type)); - if (usedStrongLocks) { + if (usedWideLocks) { + ToProto(req->add_row_locks(), modification.Locks); + } else if (usedStrongLocks) { auto locks = modification.Locks; YT_VERIFY(!locks.HasNewLocks()); - req->add_row_locks(locks.ToLegacyMask().GetBitmap()); + req->add_row_legacy_locks(locks.ToLegacyMask().GetBitmap()); } else { TLegacyLockBitmap bitmap = 0; for (int index = 0; index < TLegacyLockMask::MaxCount; ++index) { @@ -426,7 +433,7 @@ void TTransaction::ModifyRows( bitmap |= 1u << index; } } - req->add_row_read_locks(bitmap); + req->add_row_legacy_read_locks(bitmap); } } diff --git a/yt/yt/client/table_client/public.h b/yt/yt/client/table_client/public.h index 4de8d2f2b7..73b6c8dca6 100644 --- a/yt/yt/client/table_client/public.h +++ b/yt/yt/client/table_client/public.h @@ -4,6 +4,8 @@ #include <yt/yt/client/cypress_client/public.h> +#include <yt/yt/client/tablet_client/public.h> + #include <yt/yt/client/transaction_client/public.h> #include <yt/yt/core/misc/range.h> diff --git a/yt/yt/client/table_client/schema.cpp b/yt/yt/client/table_client/schema.cpp index da847dcc23..672cb8a520 100644 --- a/yt/yt/client/table_client/schema.cpp +++ b/yt/yt/client/table_client/schema.cpp @@ -22,6 +22,8 @@ #include <yt/yt_proto/yt/client/table_chunk_format/proto/chunk_meta.pb.h> #include <yt/yt_proto/yt/client/table_chunk_format/proto/wire_protocol.pb.h> +#include <yt/yt_proto/yt/client/tablet_client/proto/lock_mask.pb.h> + namespace NYT::NTableClient { using namespace NChunkClient; @@ -112,6 +114,37 @@ TLockMask MaxMask(TLockMask lhs, TLockMask rhs) return lhs; } +void ToProto(NTabletClient::NProto::TLockMask* protoLockMask, const TLockMask& lockMask) +{ + auto size = lockMask.GetSize(); + YT_VERIFY(size <= TLockMask::MaxSize); + + protoLockMask->set_size(size); + + const auto& bitmap = lockMask.GetBitmap(); + auto wordCount = DivCeil(size, TLockMask::LocksPerWord); + YT_VERIFY(std::ssize(bitmap) >= wordCount); + + protoLockMask->clear_bitmap(); + for (int index = 0; index < wordCount; ++index) { + protoLockMask->add_bitmap(bitmap[index]); + } +} + +void FromProto(TLockMask* lockMask, const NTabletClient::NProto::TLockMask& protoLockMask) +{ + auto size = protoLockMask.size(); + auto wordCount = DivCeil<int>(size, TLockMask::LocksPerWord); + + TLockBitmap bitmap; + bitmap.reserve(wordCount); + for (int index = 0; index < wordCount; ++index) { + bitmap.push_back(protoLockMask.bitmap(index)); + } + + *lockMask = TLockMask(bitmap, size); +} + //////////////////////////////////////////////////////////////////////////////// TColumnSchema::TColumnSchema() diff --git a/yt/yt/client/table_client/schema.h b/yt/yt/client/table_client/schema.h index 91729bc8cc..a556a0637d 100644 --- a/yt/yt/client/table_client/schema.h +++ b/yt/yt/client/table_client/schema.h @@ -107,6 +107,9 @@ bool operator == (const TLockMask& lhs, const TLockMask& rhs); TLockMask MaxMask(TLockMask lhs, TLockMask rhs); +void ToProto(NTabletClient::NProto::TLockMask* protoLockMask, const TLockMask& lockMask); +void FromProto(TLockMask* lockMask, const NTabletClient::NProto::TLockMask& protoLockMask); + //////////////////////////////////////////////////////////////////////////////// class TColumnSchema diff --git a/yt/yt/client/tablet_client/public.h b/yt/yt/client/tablet_client/public.h index 5d0fe07822..e2bc658175 100644 --- a/yt/yt/client/tablet_client/public.h +++ b/yt/yt/client/tablet_client/public.h @@ -10,6 +10,12 @@ namespace NYT::NTabletClient { //////////////////////////////////////////////////////////////////////////////// +namespace NProto { + +class TLockMask; + +} // namespace NProto + DEFINE_ENUM(ETabletState, // Individual states ((Mounting) (0)) diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto index 860672c48f..6e4ce5a93d 100644 --- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto +++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto @@ -10,9 +10,10 @@ import "yt_proto/yt/core/misc/proto/guid.proto"; import "yt_proto/yt/core/misc/proto/error.proto"; import "yt_proto/yt/core/ytree/proto/attributes.proto"; import "yt_proto/yt/core/ytree/proto/request_complexity_limits.proto"; -import "yt_proto/yt/client/hive/proto/timestamp_map.proto"; import "yt_proto/yt/client/chunk_client/proto/data_statistics.proto"; import "yt_proto/yt/client/chaos_client/proto/replication_card.proto"; +import "yt_proto/yt/client/hive/proto/timestamp_map.proto"; +import "yt_proto/yt/client/tablet_client/proto/lock_mask.proto"; //////////////////////////////////////////////////////////////////////////////// // Scalars @@ -905,14 +906,17 @@ message TReqModifyRows required string path = 2; repeated ERowModificationType row_modification_types = 3; // COMPAT(lukyan): Remove after RPC protocol version update - repeated uint32 row_read_locks = 7; - repeated uint64 row_locks = 8; + repeated uint32 row_legacy_read_locks = 7; + // COMPAT(ponasenko-rs): Remove after RPC protocol version update + repeated uint64 row_legacy_locks = 8; optional bool require_sync_replica = 4; optional NYT.NProto.TGuid upstream_replica_id = 5; optional bool allow_missing_key_columns = 10; + repeated NYT.NTabletClient.NProto.TLockMask row_locks = 11; + required TRowsetDescriptor rowset_descriptor = 200; } diff --git a/yt/yt_proto/yt/client/tablet_client/proto/lock_mask.proto b/yt/yt_proto/yt/client/tablet_client/proto/lock_mask.proto new file mode 100644 index 0000000000..55d74bf6a0 --- /dev/null +++ b/yt/yt_proto/yt/client/tablet_client/proto/lock_mask.proto @@ -0,0 +1,13 @@ +package NYT.NTabletClient.NProto; + +option go_package = "a.yandex-team.ru/yt/go/proto/client/tablet_client"; + +//////////////////////////////////////////////////////////////////////////////// + +message TLockMask +{ + repeated uint64 bitmap = 1; + required uint32 size = 2; +} + +//////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt_proto/yt/client/ya.make b/yt/yt_proto/yt/client/ya.make index 8c742acce8..7552ce952d 100644 --- a/yt/yt_proto/yt/client/ya.make +++ b/yt/yt_proto/yt/client/ya.make @@ -38,6 +38,8 @@ SRCS( table_chunk_format/proto/column_meta.proto table_chunk_format/proto/wire_protocol.proto + tablet_client/proto/lock_mask.proto + transaction_client/proto/timestamp_service.proto query_client/proto/query_statistics.proto |