aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkruall <kruall@ydb.tech>2023-06-21 15:45:15 +0300
committerkruall <kruall@ydb.tech>2023-06-21 15:45:15 +0300
commit639e4f87a1eab243840614a867086f32356ffde2 (patch)
tree4dd3b7f845d389a45498bffc4cf354711648f8d4
parent21d8ea644e826af29c32c267c542a69cc139effb (diff)
downloadydb-639e4f87a1eab243840614a867086f32356ffde2.tar.gz
Move kv api logic to oss,
-rw-r--r--ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/grpc_services/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/grpc_services/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/grpc_services/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/grpc_services/rpc_keyvalue.cpp1048
-rw-r--r--ydb/core/grpc_services/service_keyvalue.h23
-rw-r--r--ydb/core/grpc_services/ya.make1
-rw-r--r--ydb/services/CMakeLists.txt1
-rw-r--r--ydb/services/keyvalue/CMakeLists.darwin-x86_64.txt24
-rw-r--r--ydb/services/keyvalue/CMakeLists.linux-aarch64.txt25
-rw-r--r--ydb/services/keyvalue/CMakeLists.linux-x86_64.txt25
-rw-r--r--ydb/services/keyvalue/CMakeLists.txt17
-rw-r--r--ydb/services/keyvalue/CMakeLists.windows-x86_64.txt24
-rw-r--r--ydb/services/keyvalue/grpc_service.cpp81
-rw-r--r--ydb/services/keyvalue/grpc_service.h37
-rw-r--r--ydb/services/keyvalue/grpc_service_ut.cpp838
-rw-r--r--ydb/services/keyvalue/ut/CMakeLists.darwin-x86_64.txt80
-rw-r--r--ydb/services/keyvalue/ut/CMakeLists.linux-aarch64.txt83
-rw-r--r--ydb/services/keyvalue/ut/CMakeLists.linux-x86_64.txt85
-rw-r--r--ydb/services/keyvalue/ut/CMakeLists.txt17
-rw-r--r--ydb/services/keyvalue/ut/CMakeLists.windows-x86_64.txt73
-rw-r--r--ydb/services/keyvalue/ut/ya.make19
-rw-r--r--ydb/services/keyvalue/ya.make20
-rw-r--r--ydb/services/ya.make1
24 files changed, 2526 insertions, 0 deletions
diff --git a/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt b/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt
index 3bb4353c63..518f9b5671 100644
--- a/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt
@@ -109,6 +109,7 @@ target_sources(ydb-core-grpc_services PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_import.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_import_data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_keep_alive.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_keyvalue.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_kh_describe.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_kh_snapshots.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_kqp_base.cpp
diff --git a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt
index aa9c33bfe2..2316cc559b 100644
--- a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt
@@ -110,6 +110,7 @@ target_sources(ydb-core-grpc_services PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_import.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_import_data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_keep_alive.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_keyvalue.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_kh_describe.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_kh_snapshots.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_kqp_base.cpp
diff --git a/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt b/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt
index aa9c33bfe2..2316cc559b 100644
--- a/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt
@@ -110,6 +110,7 @@ target_sources(ydb-core-grpc_services PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_import.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_import_data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_keep_alive.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_keyvalue.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_kh_describe.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_kh_snapshots.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_kqp_base.cpp
diff --git a/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt b/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt
index 3bb4353c63..518f9b5671 100644
--- a/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt
@@ -109,6 +109,7 @@ target_sources(ydb-core-grpc_services PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_import.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_import_data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_keep_alive.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_keyvalue.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_kh_describe.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_kh_snapshots.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_kqp_base.cpp
diff --git a/ydb/core/grpc_services/rpc_keyvalue.cpp b/ydb/core/grpc_services/rpc_keyvalue.cpp
new file mode 100644
index 0000000000..aea0d568c9
--- /dev/null
+++ b/ydb/core/grpc_services/rpc_keyvalue.cpp
@@ -0,0 +1,1048 @@
+#include "service_keyvalue.h"
+
+#include <ydb/public/api/protos/ydb_keyvalue.pb.h>
+
+#include <ydb/core/base/path.h>
+#include <ydb/core/grpc_services/rpc_scheme_base.h>
+#include <ydb/core/grpc_services/rpc_common.h>
+#include <ydb/core/keyvalue/keyvalue_events.h>
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
+#include <ydb/core/mind/local.h>
+#include <ydb/core/protos/local.pb.h>
+
+
+namespace NKikimr::NGRpcService {
+
+using namespace NActors;
+using namespace Ydb;
+
+using TEvCreateVolumeKeyValueRequest =
+ TGrpcRequestOperationCall<Ydb::KeyValue::CreateVolumeRequest,
+ Ydb::KeyValue::CreateVolumeResponse>;
+using TEvDropVolumeKeyValueRequest =
+ TGrpcRequestOperationCall<Ydb::KeyValue::DropVolumeRequest,
+ Ydb::KeyValue::DropVolumeResponse>;
+using TEvAlterVolumeKeyValueRequest =
+ TGrpcRequestOperationCall<Ydb::KeyValue::AlterVolumeRequest,
+ Ydb::KeyValue::AlterVolumeResponse>;
+using TEvDescribeVolumeKeyValueRequest =
+ TGrpcRequestOperationCall<Ydb::KeyValue::DescribeVolumeRequest,
+ Ydb::KeyValue::DescribeVolumeResponse>;
+using TEvListLocalPartitionsKeyValueRequest =
+ TGrpcRequestOperationCall<Ydb::KeyValue::ListLocalPartitionsRequest,
+ Ydb::KeyValue::ListLocalPartitionsResponse>;
+
+using TEvAcquireLockKeyValueRequest =
+ TGrpcRequestOperationCall<Ydb::KeyValue::AcquireLockRequest,
+ Ydb::KeyValue::AcquireLockResponse>;
+using TEvExecuteTransactionKeyValueRequest =
+ TGrpcRequestOperationCall<Ydb::KeyValue::ExecuteTransactionRequest,
+ Ydb::KeyValue::ExecuteTransactionResponse>;
+using TEvReadKeyValueRequest =
+ TGrpcRequestOperationCall<Ydb::KeyValue::ReadRequest,
+ Ydb::KeyValue::ReadResponse>;
+using TEvReadRangeKeyValueRequest =
+ TGrpcRequestOperationCall<Ydb::KeyValue::ReadRangeRequest,
+ Ydb::KeyValue::ReadRangeResponse>;
+using TEvListRangeKeyValueRequest =
+ TGrpcRequestOperationCall<Ydb::KeyValue::ListRangeRequest,
+ Ydb::KeyValue::ListRangeResponse>;
+using TEvGetStorageChannelStatusKeyValueRequest =
+ TGrpcRequestOperationCall<Ydb::KeyValue::GetStorageChannelStatusRequest,
+ Ydb::KeyValue::GetStorageChannelStatusResponse>;
+
+} // namespace NKikimr::NGRpcService
+
+
+namespace NKikimr::NGRpcService {
+
+using namespace NActors;
+using namespace Ydb;
+
+#define COPY_PRIMITIVE_FIELD(name) \
+ to->set_ ## name(static_cast<decltype(to->name())>(from.name())) \
+// COPY_PRIMITIVE_FIELD
+
+#define COPY_PRIMITIVE_OPTIONAL_FIELD(name) \
+ if (from.has_ ## name()) { \
+ to->set_ ## name(static_cast<decltype(to->name())>(from.name())); \
+ } \
+// COPY_PRIMITIVE_FIELD
+
+namespace {
+
+void CopyProtobuf(const Ydb::KeyValue::AcquireLockRequest &/*from*/,
+ NKikimrKeyValue::AcquireLockRequest */*to*/)
+{
+}
+
+void CopyProtobuf(const NKikimrKeyValue::AcquireLockResult &from,
+ Ydb::KeyValue::AcquireLockResult *to)
+{
+ COPY_PRIMITIVE_FIELD(lock_generation);
+ COPY_PRIMITIVE_FIELD(node_id);
+}
+
+
+void CopyProtobuf(const Ydb::KeyValue::ExecuteTransactionRequest::Command::Rename &from,
+ NKikimrKeyValue::ExecuteTransactionRequest::Command::Rename *to)
+{
+ COPY_PRIMITIVE_FIELD(old_key);
+ COPY_PRIMITIVE_FIELD(new_key);
+}
+
+void CopyProtobuf(const Ydb::KeyValue::ExecuteTransactionRequest::Command::Concat &from,
+ NKikimrKeyValue::ExecuteTransactionRequest::Command::Concat *to)
+{
+ *to->mutable_input_keys() = from.input_keys();
+ COPY_PRIMITIVE_FIELD(output_key);
+ COPY_PRIMITIVE_FIELD(keep_inputs);
+}
+
+void CopyProtobuf(const Ydb::KeyValue::KeyRange &from, NKikimrKeyValue::KVRange *to) {
+#define CHECK_AND_SET(name) \
+ if (from.has_ ## name()) { \
+ COPY_PRIMITIVE_FIELD(name); \
+ } \
+// CHECK_AND_SET
+
+ CHECK_AND_SET(from_key_inclusive)
+ CHECK_AND_SET(from_key_exclusive)
+ CHECK_AND_SET(to_key_inclusive)
+ CHECK_AND_SET(to_key_exclusive)
+
+#undef CHECK_AND_SET
+}
+
+void CopyProtobuf(const Ydb::KeyValue::ExecuteTransactionRequest::Command::CopyRange &from,
+ NKikimrKeyValue::ExecuteTransactionRequest::Command::CopyRange *to)
+{
+ CopyProtobuf(from.range(), to->mutable_range());
+ COPY_PRIMITIVE_FIELD(prefix_to_remove);
+ COPY_PRIMITIVE_FIELD(prefix_to_add);
+}
+
+template <typename TProtoFrom, typename TProtoTo>
+void CopyPriority(TProtoFrom &&from, TProtoTo *to) {
+ switch(from.priority()) {
+ case Ydb::KeyValue::Priorities::PRIORITY_REALTIME:
+ to->set_priority(NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
+ break;
+ case Ydb::KeyValue::Priorities::PRIORITY_BACKGROUND:
+ to->set_priority(NKikimrKeyValue::Priorities::PRIORITY_BACKGROUND);
+ break;
+ default:
+ to->set_priority(NKikimrKeyValue::Priorities::PRIORITY_UNSPECIFIED);
+ break;
+ }
+}
+
+void CopyProtobuf(const Ydb::KeyValue::ExecuteTransactionRequest::Command::Write &from,
+ NKikimrKeyValue::ExecuteTransactionRequest::Command::Write *to)
+{
+ COPY_PRIMITIVE_FIELD(key);
+ COPY_PRIMITIVE_FIELD(value);
+ COPY_PRIMITIVE_FIELD(storage_channel);
+ CopyPriority(from, to);
+ switch(from.tactic()) {
+ case Ydb::KeyValue::ExecuteTransactionRequest::Command::Write::TACTIC_MAX_THROUGHPUT:
+ to->set_tactic(NKikimrKeyValue::ExecuteTransactionRequest::Command::Write::TACTIC_MAX_THROUGHPUT);
+ break;
+ case Ydb::KeyValue::ExecuteTransactionRequest::Command::Write::TACTIC_MIN_LATENCY:
+ to->set_tactic(NKikimrKeyValue::ExecuteTransactionRequest::Command::Write::TACTIC_MIN_LATENCY);
+ break;
+ default:
+ to->set_tactic(NKikimrKeyValue::ExecuteTransactionRequest::Command::Write::TACTIC_UNSPECIFIED);
+ break;
+ }
+}
+
+void CopyProtobuf(const Ydb::KeyValue::ExecuteTransactionRequest::Command::DeleteRange &from,
+ NKikimrKeyValue::ExecuteTransactionRequest::Command::DeleteRange *to)
+{
+ CopyProtobuf(from.range(), to->mutable_range());
+}
+
+void CopyProtobuf(const Ydb::KeyValue::ExecuteTransactionRequest::Command &from,
+ NKikimrKeyValue::ExecuteTransactionRequest::Command *to)
+{
+#define CHECK_AND_COPY(name) \
+ if (from.has_ ## name()) { \
+ CopyProtobuf(from.name(), to->mutable_ ## name()); \
+ } \
+// CHECK_AND_COPY
+
+ CHECK_AND_COPY(rename)
+ CHECK_AND_COPY(concat)
+ CHECK_AND_COPY(copy_range)
+ CHECK_AND_COPY(write)
+ CHECK_AND_COPY(delete_range)
+
+#undef CHECK_AND_COPY
+}
+
+void CopyProtobuf(const Ydb::KeyValue::ExecuteTransactionRequest &from,
+ NKikimrKeyValue::ExecuteTransactionRequest *to)
+{
+ COPY_PRIMITIVE_OPTIONAL_FIELD(lock_generation);
+ for (auto &cmd : from.commands()) {
+ CopyProtobuf(cmd, to->add_commands());
+ }
+}
+
+void CopyProtobuf(const NKikimrKeyValue::StorageChannel &from, Ydb::KeyValue::StorageChannelInfo *to) {
+ COPY_PRIMITIVE_FIELD(storage_channel);
+ COPY_PRIMITIVE_FIELD(status_flag);
+}
+
+void CopyProtobuf(const NKikimrKeyValue::ExecuteTransactionResult &from,
+ Ydb::KeyValue::ExecuteTransactionResult *to)
+{
+ COPY_PRIMITIVE_FIELD(node_id);
+ for (auto &channel : from.storage_channel()) {
+ CopyProtobuf(channel, to->add_storage_channel_info());
+ }
+}
+
+void CopyProtobuf(const Ydb::KeyValue::ReadRequest &from, NKikimrKeyValue::ReadRequest *to) {
+ COPY_PRIMITIVE_OPTIONAL_FIELD(lock_generation);
+ COPY_PRIMITIVE_FIELD(key);
+ COPY_PRIMITIVE_FIELD(offset);
+ COPY_PRIMITIVE_FIELD(size);
+ CopyPriority(from, to);
+ COPY_PRIMITIVE_FIELD(limit_bytes);
+}
+
+void CopyProtobuf(const NKikimrKeyValue::ReadResult &from, Ydb::KeyValue::ReadResult *to) {
+ COPY_PRIMITIVE_FIELD(requested_key);
+ COPY_PRIMITIVE_FIELD(requested_offset);
+ COPY_PRIMITIVE_FIELD(requested_size);
+ COPY_PRIMITIVE_FIELD(value);
+ COPY_PRIMITIVE_FIELD(node_id);
+ switch (from.status()) {
+ case NKikimrKeyValue::Statuses::RSTATUS_OVERRUN:
+ to->set_is_overrun(true);
+ break;
+ default:
+ break;
+ }
+}
+
+void CopyProtobuf(const Ydb::KeyValue::ReadRangeRequest &from, NKikimrKeyValue::ReadRangeRequest *to) {
+ COPY_PRIMITIVE_OPTIONAL_FIELD(lock_generation);
+ CopyProtobuf(from.range(), to->mutable_range());
+ to->set_include_data(true);
+ COPY_PRIMITIVE_FIELD(limit_bytes);
+ CopyPriority(from, to);
+}
+
+void CopyProtobuf(const Ydb::KeyValue::ListRangeRequest &from, NKikimrKeyValue::ReadRangeRequest *to) {
+ COPY_PRIMITIVE_OPTIONAL_FIELD(lock_generation);
+ CopyProtobuf(from.range(), to->mutable_range());
+ to->set_include_data(false);
+ COPY_PRIMITIVE_FIELD(limit_bytes);
+}
+
+void CopyProtobuf(const NKikimrKeyValue::ReadRangeResult::KeyValuePair &from,
+ Ydb::KeyValue::ReadRangeResult::KeyValuePair *to)
+{
+ COPY_PRIMITIVE_FIELD(key);
+ COPY_PRIMITIVE_FIELD(value);
+ COPY_PRIMITIVE_FIELD(creation_unix_time);
+ COPY_PRIMITIVE_FIELD(storage_channel);
+}
+
+void CopyProtobuf(const NKikimrKeyValue::ReadRangeResult &from,
+ Ydb::KeyValue::ReadRangeResult *to)
+{
+ for (auto &pair : from.pair()) {
+ CopyProtobuf(pair, to->add_pair());
+ }
+ if (from.status() == NKikimrKeyValue::Statuses::RSTATUS_OVERRUN) {
+ to->set_is_overrun(true);
+ }
+ COPY_PRIMITIVE_FIELD(node_id);
+}
+
+void CopyProtobuf(const NKikimrKeyValue::ReadRangeResult::KeyValuePair &from,
+ Ydb::KeyValue::ListRangeResult::KeyInfo *to)
+{
+ COPY_PRIMITIVE_FIELD(key);
+ COPY_PRIMITIVE_FIELD(value_size);
+ COPY_PRIMITIVE_FIELD(creation_unix_time);
+ COPY_PRIMITIVE_FIELD(storage_channel);
+}
+
+void CopyProtobuf(const NKikimrKeyValue::ReadRangeResult &from,
+ Ydb::KeyValue::ListRangeResult *to)
+{
+ for (auto &pair : from.pair()) {
+ CopyProtobuf(pair, to->add_key());
+ }
+ if (from.status() == NKikimrKeyValue::Statuses::RSTATUS_OVERRUN) {
+ to->set_is_overrun(true);
+ }
+ COPY_PRIMITIVE_FIELD(node_id);
+}
+
+void CopyProtobuf(const Ydb::KeyValue::GetStorageChannelStatusRequest &from,
+ NKikimrKeyValue::GetStorageChannelStatusRequest *to)
+{
+ COPY_PRIMITIVE_OPTIONAL_FIELD(lock_generation);
+ *to->mutable_storage_channel() = from.storage_channel();
+}
+
+
+void CopyProtobuf(const NKikimrKeyValue::GetStorageChannelStatusResult &from,
+ Ydb::KeyValue::GetStorageChannelStatusResult *to)
+{
+ for (auto &channel : from.storage_channel()) {
+ CopyProtobuf(channel, to->add_storage_channel_info());
+ }
+ COPY_PRIMITIVE_FIELD(node_id);
+}
+
+
+Ydb::StatusIds::StatusCode PullStatus(const NKikimrKeyValue::AcquireLockResult &) {
+ return Ydb::StatusIds::SUCCESS;
+}
+
+template <typename TResult>
+Ydb::StatusIds::StatusCode PullStatus(const TResult &result) {
+ switch (result.status()) {
+ case NKikimrKeyValue::Statuses::RSTATUS_OK:
+ case NKikimrKeyValue::Statuses::RSTATUS_OVERRUN:
+ return Ydb::StatusIds::SUCCESS;
+ case NKikimrKeyValue::Statuses::RSTATUS_ERROR:
+ return Ydb::StatusIds::GENERIC_ERROR;
+ case NKikimrKeyValue::Statuses::RSTATUS_TIMEOUT:
+ return Ydb::StatusIds::TIMEOUT;
+ case NKikimrKeyValue::Statuses::RSTATUS_NOT_FOUND:
+ return Ydb::StatusIds::NOT_FOUND;
+ case NKikimrKeyValue::Statuses::RSTATUS_WRONG_LOCK_GENERATION:
+ return Ydb::StatusIds::PRECONDITION_FAILED;
+ default:
+ return Ydb::StatusIds::INTERNAL_ERROR;
+ }
+}
+
+namespace {
+ void AssignPoolKinds(auto &storageConfig, auto *internalStorageConfig) {
+ ui32 size = storageConfig.channel_size();
+
+ for (ui32 channelIdx = 0; channelIdx < size; ++channelIdx) {
+ internalStorageConfig->AddChannel()->SetPreferredPoolKind(storageConfig.channel(channelIdx).media());
+ }
+ }
+}
+
+
+class TCreateVolumeRequest : public TRpcSchemeRequestActor<TCreateVolumeRequest, TEvCreateVolumeKeyValueRequest> {
+public:
+ using TBase = TRpcSchemeRequestActor<TCreateVolumeRequest, TEvCreateVolumeKeyValueRequest>;
+ using TBase::TBase;
+
+ void Bootstrap(const TActorContext& ctx) {
+ TBase::Bootstrap(ctx);
+ Become(&TCreateVolumeRequest::StateFunc);
+ SendProposeRequest(ctx);
+ }
+
+ void SendProposeRequest(const TActorContext &ctx) {
+ const auto req = this->GetProtoRequest();
+
+ std::pair<TString, TString> pathPair;
+ try {
+ pathPair = SplitPath(Request_->GetDatabaseName(), req->path());
+ } catch (const std::exception& ex) {
+ Request_->RaiseIssue(NYql::ExceptionToIssue(ex));
+ return Reply(StatusIds::BAD_REQUEST, ctx);
+ }
+ const auto& workingDir = pathPair.first;
+ const auto& name = pathPair.second;
+
+ std::unique_ptr<TEvTxUserProxy::TEvProposeTransaction> proposeRequest = this->CreateProposeTransaction();
+ NKikimrTxUserProxy::TEvProposeTransaction& record = proposeRequest->Record;
+ NKikimrSchemeOp::TModifyScheme* modifyScheme = record.MutableTransaction()->MutableModifyScheme();
+ modifyScheme->SetWorkingDir(workingDir);
+ NKikimrSchemeOp::TCreateSolomonVolume* tableDesc = nullptr;
+
+ modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpCreateSolomonVolume);
+ tableDesc = modifyScheme->MutableCreateSolomonVolume();
+ tableDesc->SetName(name);
+ tableDesc->SetPartitionCount(req->partition_count());
+
+ if (GetProtoRequest()->has_storage_config()) {
+ auto &storageConfig = GetProtoRequest()->storage_config();
+ auto *internalStorageConfig = tableDesc->MutableStorageConfig();
+ AssignPoolKinds(storageConfig, internalStorageConfig);
+ } else {
+ tableDesc->SetChannelProfileId(GetProtoRequest()->partition_count());
+ }
+
+ ctx.Send(MakeTxProxyID(), proposeRequest.release());
+ }
+
+ STFUNC(StateFunc) {
+ return TBase::StateWork(ev);
+ }
+};
+
+
+class TDropVolumeRequest : public TRpcSchemeRequestActor<TDropVolumeRequest, TEvDropVolumeKeyValueRequest> {
+public:
+ using TBase = TRpcSchemeRequestActor<TDropVolumeRequest, TEvDropVolumeKeyValueRequest>;
+ using TBase::TBase;
+
+ void Bootstrap(const TActorContext& ctx) {
+ TBase::Bootstrap(ctx);
+ Become(&TDropVolumeRequest::StateFunc);
+ SendProposeRequest(ctx);
+ }
+
+ void SendProposeRequest(const TActorContext &ctx) {
+ const auto req = this->GetProtoRequest();
+
+ std::pair<TString, TString> pathPair;
+ try {
+ pathPair = SplitPath(req->path());
+ } catch (const std::exception& ex) {
+ Request_->RaiseIssue(NYql::ExceptionToIssue(ex));
+ return Reply(StatusIds::BAD_REQUEST, ctx);
+ }
+ const auto& workingDir = pathPair.first;
+ const auto& name = pathPair.second;
+
+ std::unique_ptr<TEvTxUserProxy::TEvProposeTransaction> proposeRequest = this->CreateProposeTransaction();
+ NKikimrTxUserProxy::TEvProposeTransaction& record = proposeRequest->Record;
+ NKikimrSchemeOp::TModifyScheme* modifyScheme = record.MutableTransaction()->MutableModifyScheme();
+ modifyScheme->SetWorkingDir(workingDir);
+ NKikimrSchemeOp::TDrop* drop = nullptr;
+
+ modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpDropSolomonVolume);
+ drop = modifyScheme->MutableDrop();
+ drop->SetName(name);
+
+ ctx.Send(MakeTxProxyID(), proposeRequest.release());
+ }
+
+ STFUNC(StateFunc) {
+ return TBase::StateWork(ev);
+ }
+};
+
+class TAlterVolumeRequest : public TRpcSchemeRequestActor<TAlterVolumeRequest, TEvAlterVolumeKeyValueRequest> {
+public:
+ using TBase = TRpcSchemeRequestActor<TAlterVolumeRequest, TEvAlterVolumeKeyValueRequest>;
+ using TBase::TBase;
+
+ void Bootstrap(const TActorContext& ctx) {
+ TBase::Bootstrap(ctx);
+ Become(&TAlterVolumeRequest::StateFunc);
+ SendProposeRequest(ctx);
+ }
+
+ void SendProposeRequest(const TActorContext &ctx) {
+ const auto req = this->GetProtoRequest();
+
+ std::pair<TString, TString> pathPair;
+ try {
+ pathPair = SplitPath(req->path());
+ } catch (const std::exception& ex) {
+ Request_->RaiseIssue(NYql::ExceptionToIssue(ex));
+ return Reply(StatusIds::BAD_REQUEST, ctx);
+ }
+ const auto& workingDir = pathPair.first;
+ const auto& name = pathPair.second;
+
+ std::unique_ptr<TEvTxUserProxy::TEvProposeTransaction> proposeRequest = this->CreateProposeTransaction();
+ NKikimrTxUserProxy::TEvProposeTransaction& record = proposeRequest->Record;
+ NKikimrSchemeOp::TModifyScheme* modifyScheme = record.MutableTransaction()->MutableModifyScheme();
+ modifyScheme->SetWorkingDir(workingDir);
+ NKikimrSchemeOp::TAlterSolomonVolume* tableDesc = nullptr;
+
+ modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterSolomonVolume);
+ tableDesc = modifyScheme->MutableAlterSolomonVolume();
+ tableDesc->SetName(name);
+ tableDesc->SetPartitionCount(req->alter_partition_count());
+
+ if (GetProtoRequest()->has_storage_config()) {
+ tableDesc->SetUpdateChannelsBinding(true);
+ auto &storageConfig = GetProtoRequest()->storage_config();
+ auto *internalStorageConfig = tableDesc->MutableStorageConfig();
+ AssignPoolKinds(storageConfig, internalStorageConfig);
+ } else {
+ tableDesc->SetUpdateChannelsBinding(false);
+ tableDesc->SetChannelProfileId(0);
+ }
+
+ ctx.Send(MakeTxProxyID(), proposeRequest.release());
+ }
+
+ STFUNC(StateFunc) {
+ return TBase::StateWork(ev);
+ }
+};
+
+template <typename TDerived>
+class TBaseKeyValueRequest {
+protected:
+ void OnBootstrap() {
+ auto self = static_cast<TDerived*>(this);
+ Ydb::StatusIds::StatusCode status = Ydb::StatusIds::STATUS_CODE_UNSPECIFIED;
+ NYql::TIssues issues;
+ if (!self->ValidateRequest(status, issues)) {
+ self->Reply(status, issues, self->ActorContext());
+ return;
+ }
+ if (const auto& userToken = self->Request_->GetSerializedToken()) {
+ UserToken = new NACLib::TUserToken(userToken);
+ }
+ SendNavigateRequest();
+ }
+
+ void SendNavigateRequest() {
+ auto self = static_cast<TDerived*>(this);
+ auto &rec = *self->GetProtoRequest();
+ auto req = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
+ auto& entry = req->ResultSet.emplace_back();
+ entry.Path = ::NKikimr::SplitPath(rec.path());
+ entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByPath;
+ entry.ShowPrivatePath = true;
+ entry.SyncVersion = false;
+ req->UserToken = UserToken;
+ req->DatabaseName = self->Request_->GetDatabaseName().GetOrElse("");
+ auto ev = new TEvTxProxySchemeCache::TEvNavigateKeySet(req.Release());
+ self->Send(MakeSchemeCacheID(), ev);
+ }
+
+ bool OnNavigateKeySetResult(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr &ev, ui32 access) {
+ auto self = static_cast<TDerived*>(this);
+ TEvTxProxySchemeCache::TEvNavigateKeySetResult* res = ev->Get();
+ NSchemeCache::TSchemeCacheNavigate *request = res->Request.Get();
+
+ auto ctx = self->ActorContext();
+
+ if (res->Request->ResultSet.size() != 1) {
+ self->Reply(StatusIds::INTERNAL_ERROR, "Received an incorrect answer from SchemeCache.", NKikimrIssues::TIssuesIds::UNEXPECTED, ctx);
+ return false;
+ }
+
+ switch (request->ResultSet[0].Status) {
+ case NSchemeCache::TSchemeCacheNavigate::EStatus::Ok:
+ break;
+ case NSchemeCache::TSchemeCacheNavigate::EStatus::RootUnknown:
+ case NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown:
+ self->Reply(StatusIds::SCHEME_ERROR, "Path isn't exist.", NKikimrIssues::TIssuesIds::PATH_NOT_EXIST, ctx);
+ return false;
+ case NSchemeCache::TSchemeCacheNavigate::EStatus::LookupError:
+ case NSchemeCache::TSchemeCacheNavigate::EStatus::RedirectLookupError:
+ self->Reply(StatusIds::UNAVAILABLE, "Database resolve failed with no certain result.", NKikimrIssues::TIssuesIds::RESOLVE_LOOKUP_ERROR, ctx);
+ return false;
+ default:
+ self->Reply(StatusIds::UNAVAILABLE, "Resolve error", NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, ctx);
+ return false;
+ }
+
+ if (!self->CheckAccess(CanonizePath(res->Request->ResultSet[0].Path), res->Request->ResultSet[0].SecurityObject, access)) {
+ return false;
+ }
+ if (!request->ResultSet[0].SolomonVolumeInfo) {
+ self->Reply(StatusIds::SCHEME_ERROR, "Table isn't keyvalue.", NKikimrIssues::TIssuesIds::DEFAULT_ERROR, ctx);
+ return false;
+ }
+
+ return true;
+ }
+
+ bool CheckAccess(const TString& path, TIntrusivePtr<TSecurityObject> securityObject, ui32 access) {
+ auto self = static_cast<TDerived*>(this);
+ if (!UserToken || !securityObject) {
+ return true;
+ }
+
+ if (securityObject->CheckAccess(access, *UserToken)) {
+ return true;
+ }
+
+ self->Reply(Ydb::StatusIds::UNAUTHORIZED,
+ TStringBuilder() << "Access denied"
+ << ": for# " << UserToken->GetUserSID()
+ << ", path# " << path
+ << ", access# " << NACLib::AccessRightsToString(access),
+ NKikimrIssues::TIssuesIds::ACCESS_DENIED,
+ self->ActorContext());
+ return false;
+ }
+
+private:
+ TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
+};
+
+class TDescribeVolumeRequest
+ : public TRpcOperationRequestActor<TDescribeVolumeRequest, TEvDescribeVolumeKeyValueRequest>
+ , public TBaseKeyValueRequest<TDescribeVolumeRequest>
+{
+public:
+ using TBase = TRpcOperationRequestActor<TDescribeVolumeRequest, TEvDescribeVolumeKeyValueRequest>;
+ using TBase::TBase;
+
+ friend class TBaseKeyValueRequest<TDescribeVolumeRequest>;
+
+ void Bootstrap(const TActorContext& ctx) {
+ TBase::Bootstrap(ctx);
+ OnBootstrap();
+ Become(&TDescribeVolumeRequest::StateFunc);
+ }
+
+
+protected:
+ STFUNC(StateFunc) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
+ default:
+ return TBase::StateFuncBase(ev);
+ }
+ }
+
+ void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr &ev) {
+ TEvTxProxySchemeCache::TEvNavigateKeySetResult* res = ev->Get();
+ NSchemeCache::TSchemeCacheNavigate *request = res->Request.Get();
+
+ if (!OnNavigateKeySetResult(ev, NACLib::DescribeSchema)) {
+ return;
+ }
+
+ const NKikimrSchemeOp::TSolomonVolumeDescription &desc = request->ResultSet[0].SolomonVolumeInfo->Description;
+ Ydb::KeyValue::DescribeVolumeResult result;
+ result.set_path(this->GetProtoRequest()->path());
+ result.set_partition_count(desc.PartitionsSize());
+ this->ReplyWithResult(Ydb::StatusIds::SUCCESS, result, TActivationContext::AsActorContext());
+ }
+
+ bool ValidateRequest(Ydb::StatusIds::StatusCode& /*status*/, NYql::TIssues& /*issues*/) {
+ return true;
+ }
+
+private:
+ TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
+};
+
+
+class TListLocalPartitionsRequest
+ : public TRpcOperationRequestActor<TListLocalPartitionsRequest, TEvListLocalPartitionsKeyValueRequest>
+ , public TBaseKeyValueRequest<TListLocalPartitionsRequest>
+{
+public:
+ using TBase = TRpcOperationRequestActor<TListLocalPartitionsRequest, TEvListLocalPartitionsKeyValueRequest>;
+ using TBase::TBase;
+
+ friend class TBaseKeyValueRequest<TListLocalPartitionsRequest>;
+
+ void Bootstrap(const TActorContext& ctx) {
+ TBase::Bootstrap(ctx);
+ OnBootstrap();
+ Become(&TListLocalPartitionsRequest::StateFunc);
+ }
+
+protected:
+ STFUNC(StateFunc) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
+ hFunc(TEvLocal::TEvEnumerateTabletsResult, Handle);
+ default:
+ return TBase::StateFuncBase(ev);
+ }
+ }
+
+ void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr &ev) {
+ TEvTxProxySchemeCache::TEvNavigateKeySetResult* res = ev->Get();
+ NSchemeCache::TSchemeCacheNavigate *request = res->Request.Get();
+
+ if (!OnNavigateKeySetResult(ev, NACLib::DescribeSchema)) {
+ return;
+ }
+
+ const NKikimrSchemeOp::TSolomonVolumeDescription &desc = request->ResultSet[0].SolomonVolumeInfo->Description;
+ for (const NKikimrSchemeOp::TSolomonVolumeDescription::TPartition &partition : desc.GetPartitions()) {
+ TabletIdToPartitionId[partition.GetTabletId()] = partition.GetPartitionId();
+ }
+
+ if (TabletIdToPartitionId.empty()) {
+ Ydb::KeyValue::ListLocalPartitionsResult result;
+ result.set_path(this->GetProtoRequest()->path());
+ result.set_node_id(SelfId().NodeId());
+ this->ReplyWithResult(Ydb::StatusIds::SUCCESS, result, TActivationContext::AsActorContext());
+ return;
+ }
+
+ SendRequest();
+ }
+
+ TActorId MakeLocalRegistrarID() {
+ auto &ctx = TActivationContext::AsActorContext();
+ auto &domainsInfo = *AppData(ctx)->DomainsInfo;
+ auto domainIt = domainsInfo.Domains.find(1);
+ if (domainIt == domainsInfo.Domains.end()) {
+ TActorId invalidId;
+ return invalidId;
+ }
+ auto &rec = *this->GetProtoRequest();
+ ui32 nodeId = rec.node_id() ? rec.node_id() : ctx.SelfID.NodeId();
+ ui32 hiveUid = domainsInfo.GetDefaultHiveUid(1);
+ ui64 hiveId = domainsInfo.GetHive(hiveUid);
+ return ::NKikimr::MakeLocalRegistrarID(nodeId, hiveId);
+ }
+
+ TEvLocal::TEvEnumerateTablets* MakeRequest() {
+ return new TEvLocal::TEvEnumerateTablets(TTabletTypes::KeyValue);
+ }
+
+ void SendRequest() {
+ Send(MakeLocalRegistrarID(), MakeRequest(), IEventHandle::FlagTrackDelivery, 0);
+ }
+
+ void Handle(TEvLocal::TEvEnumerateTabletsResult::TPtr &ev) {
+ const NKikimrLocal::TEvEnumerateTabletsResult &record = ev->Get()->Record;
+ if (!record.HasStatus() || record.GetStatus() != NKikimrProto::OK) {
+ this->Reply(StatusIds::INTERNAL_ERROR, "Received an incorrect answer from Local.", NKikimrIssues::TIssuesIds::UNEXPECTED, this->ActorContext());
+ return;
+ }
+
+ Ydb::KeyValue::ListLocalPartitionsResult result;
+ result.set_path(this->GetProtoRequest()->path());
+ result.set_node_id(SelfId().NodeId());
+ for (auto &item : record.GetTabletInfo()) {
+ if (!item.HasTabletId()) {
+ continue;
+ }
+ auto it = TabletIdToPartitionId.find(item.GetTabletId());
+ if (it != TabletIdToPartitionId.end()) {
+ result.add_partition_ids(it->second);
+ }
+ }
+ this->ReplyWithResult(Ydb::StatusIds::SUCCESS, result, TActivationContext::AsActorContext());
+ }
+
+ bool ValidateRequest(Ydb::StatusIds::StatusCode& /*status*/, NYql::TIssues& /*issues*/) {
+ return true;
+ }
+
+private:
+ THashMap<ui64, ui64> TabletIdToPartitionId;
+};
+
+
+template <typename TDerived, typename TRequest, typename TResultRecord, typename TKVRequest>
+class TKeyValueRequestGrpc
+ : public TRpcOperationRequestActor<TDerived, TRequest>
+ , public TBaseKeyValueRequest<TKeyValueRequestGrpc<TDerived, TRequest, TResultRecord, TKVRequest>>
+{
+public:
+ using TBase = TRpcOperationRequestActor<TDerived, TRequest>;
+ using TBase::TBase;
+
+ friend class TBaseKeyValueRequest<TKeyValueRequestGrpc<TDerived, TRequest, TResultRecord, TKVRequest>>;
+
+ void Bootstrap(const TActorContext& ctx) {
+ TBase::Bootstrap(ctx);
+ this->OnBootstrap();
+ this->Become(&TKeyValueRequestGrpc::StateFunc);
+ }
+
+
+protected:
+ STFUNC(StateFunc) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvTabletPipe::TEvClientConnected, Handle);
+ hFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
+ hFunc(TKVRequest::TResponse, Handle);
+ hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
+ default:
+ return TBase::StateFuncBase(ev);
+ }
+ }
+
+ void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr &ev) {
+ TEvTxProxySchemeCache::TEvNavigateKeySetResult* res = ev->Get();
+ NSchemeCache::TSchemeCacheNavigate *request = res->Request.Get();
+
+ if (!this->OnNavigateKeySetResult(ev, static_cast<TDerived*>(this)->GetRequiredAccessRights())) {
+ return;
+ }
+
+ auto &rec = *this->GetProtoRequest();
+ const NKikimrSchemeOp::TSolomonVolumeDescription &desc = request->ResultSet[0].SolomonVolumeInfo->Description;
+
+ if (rec.partition_id() >= desc.PartitionsSize()) {
+ this->Reply(StatusIds::SCHEME_ERROR, "The partition wasn't found. Partition ID was larger or equal partition count.", NKikimrIssues::TIssuesIds::DEFAULT_ERROR, this->ActorContext());
+ return;
+ }
+
+ ui64 partitionId = rec.partition_id();
+ if (const auto &partition = desc.GetPartitions(rec.partition_id()); partition.GetPartitionId() == partitionId) {
+ KVTabletId = partition.GetTabletId();
+ } else {
+ Y_VERIFY_DEBUG(false);
+ for (const NKikimrSchemeOp::TSolomonVolumeDescription::TPartition &partition : desc.GetPartitions()) {
+ if (partition.GetPartitionId() == partitionId) {
+ KVTabletId = partition.GetTabletId();
+ break;
+ }
+ }
+ }
+
+ if (!KVTabletId) {
+ this->Reply(StatusIds::INTERNAL_ERROR, "Partition wasn't found.", NKikimrIssues::TIssuesIds::DEFAULT_ERROR, this->ActorContext());
+ return;
+ }
+
+ CreatePipe();
+ SendRequest();
+ }
+
+ void SendRequest() {
+ std::unique_ptr<TKVRequest> req = std::make_unique<TKVRequest>();
+ auto &rec = *this->GetProtoRequest();
+ CopyProtobuf(rec, &req->Record);
+ req->Record.set_tablet_id(KVTabletId);
+ NTabletPipe::SendData(this->SelfId(), KVPipeClient, req.release(), 0);
+ }
+
+ void Handle(typename TKVRequest::TResponse::TPtr &ev) {
+ TResultRecord result;
+ CopyProtobuf(ev->Get()->Record, &result);
+ auto status = PullStatus(ev->Get()->Record);
+ this->ReplyWithResult(status, result, TActivationContext::AsActorContext());
+ }
+
+ NTabletPipe::TClientConfig GetPipeConfig() {
+ NTabletPipe::TClientConfig cfg;
+ cfg.RetryPolicy = {
+ .RetryLimitCount = 3u
+ };
+ return cfg;
+ }
+
+ void CreatePipe() {
+ KVPipeClient = this->Register(NTabletPipe::CreateClient(this->SelfId(), KVTabletId, GetPipeConfig()));
+ }
+
+ void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) {
+ if (ev->Get()->Status != NKikimrProto::OK) {
+ this->Reply(StatusIds::UNAVAILABLE, "Failed to connect to coordination node.", NKikimrIssues::TIssuesIds::SHARD_NOT_AVAILABLE, this->ActorContext());
+ }
+ }
+
+ void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr&) {
+ this->Reply(StatusIds::UNAVAILABLE, "Connection to coordination node was lost.", NKikimrIssues::TIssuesIds::SHARD_NOT_AVAILABLE, this->ActorContext());
+ }
+
+ virtual bool ValidateRequest(Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) = 0;
+
+ void PassAway() override {
+ if (KVPipeClient) {
+ NTabletPipe::CloseClient(this->SelfId(), KVPipeClient);
+ KVPipeClient = {};
+ }
+ TBase::PassAway();
+ }
+
+protected:
+ ui64 KVTabletId = 0;
+ TActorId KVPipeClient;
+};
+
+class TAcquireLockRequest
+ : public TKeyValueRequestGrpc<TAcquireLockRequest, TEvAcquireLockKeyValueRequest,
+ Ydb::KeyValue::AcquireLockResult, TEvKeyValue::TEvAcquireLock>
+{
+public:
+ using TBase = TKeyValueRequestGrpc<TAcquireLockRequest, TEvAcquireLockKeyValueRequest,
+ Ydb::KeyValue::AcquireLockResult, TEvKeyValue::TEvAcquireLock>;
+ using TBase::TBase;
+
+ bool ValidateRequest(Ydb::StatusIds::StatusCode& /*status*/, NYql::TIssues& /*issues*/) override {
+ return true;
+ }
+ NACLib::EAccessRights GetRequiredAccessRights() const {
+ return NACLib::UpdateRow;
+ }
+};
+
+
+class TExecuteTransactionRequest
+ : public TKeyValueRequestGrpc<TExecuteTransactionRequest, TEvExecuteTransactionKeyValueRequest,
+ Ydb::KeyValue::ExecuteTransactionResult, TEvKeyValue::TEvExecuteTransaction> {
+public:
+ using TBase = TKeyValueRequestGrpc<TExecuteTransactionRequest, TEvExecuteTransactionKeyValueRequest,
+ Ydb::KeyValue::ExecuteTransactionResult, TEvKeyValue::TEvExecuteTransaction>;
+ using TBase::TBase;
+
+ bool ValidateRequest(Ydb::StatusIds::StatusCode& /*status*/, NYql::TIssues& /*issues*/) override {
+ return true;
+ }
+
+ NACLib::EAccessRights GetRequiredAccessRights() const {
+ ui32 accessRights = 0;
+ auto &rec = *this->GetProtoRequest();
+ for (auto &command : rec.commands()) {
+ if (command.has_delete_range()) {
+ accessRights |= NACLib::EraseRow;
+ }
+ if (command.has_rename()) {
+ accessRights |= NACLib::UpdateRow | NACLib::EraseRow;
+ }
+ if (command.has_copy_range()) {
+ accessRights |= NACLib::UpdateRow;
+ }
+ if (command.has_concat() && !command.concat().keep_inputs()) {
+ accessRights |= NACLib::UpdateRow | NACLib::EraseRow;
+ }
+ if (command.has_concat() && command.concat().keep_inputs()) {
+ accessRights |= NACLib::UpdateRow;
+ }
+ if (command.has_write()) {
+ accessRights |= NACLib::UpdateRow;
+ }
+ }
+ return static_cast<NACLib::EAccessRights>(accessRights);
+ }
+};
+
+class TReadRequest
+ : public TKeyValueRequestGrpc<TReadRequest, TEvReadKeyValueRequest,
+ Ydb::KeyValue::ReadResult, TEvKeyValue::TEvRead> {
+public:
+ using TBase = TKeyValueRequestGrpc<TReadRequest, TEvReadKeyValueRequest,
+ Ydb::KeyValue::ReadResult, TEvKeyValue::TEvRead>;
+ using TBase::TBase;
+ using TBase::Handle;
+ STFUNC(StateFunc) {
+ switch (ev->GetTypeRewrite()) {
+ default:
+ return TBase::StateFunc(ev);
+ }
+ }
+ bool ValidateRequest(Ydb::StatusIds::StatusCode& /*status*/, NYql::TIssues& /*issues*/) override {
+ return true;
+ }
+ NACLib::EAccessRights GetRequiredAccessRights() const {
+ return NACLib::SelectRow;
+ }
+};
+
+class TReadRangeRequest
+ : public TKeyValueRequestGrpc<TReadRangeRequest, TEvReadRangeKeyValueRequest,
+ Ydb::KeyValue::ReadRangeResult, TEvKeyValue::TEvReadRange> {
+public:
+ using TBase = TKeyValueRequestGrpc<TReadRangeRequest, TEvReadRangeKeyValueRequest,
+ Ydb::KeyValue::ReadRangeResult, TEvKeyValue::TEvReadRange>;
+ using TBase::TBase;
+ using TBase::Handle;
+ STFUNC(StateFunc) {
+ switch (ev->GetTypeRewrite()) {
+ default:
+ return TBase::StateFunc(ev);
+ }
+ }
+ bool ValidateRequest(Ydb::StatusIds::StatusCode& /*status*/, NYql::TIssues& /*issues*/) override {
+ return true;
+ }
+ NACLib::EAccessRights GetRequiredAccessRights() const {
+ return NACLib::SelectRow;
+ }
+};
+
+class TListRangeRequest
+ : public TKeyValueRequestGrpc<TListRangeRequest, TEvListRangeKeyValueRequest,
+ Ydb::KeyValue::ListRangeResult, TEvKeyValue::TEvReadRange> {
+public:
+ using TBase = TKeyValueRequestGrpc<TListRangeRequest, TEvListRangeKeyValueRequest,
+ Ydb::KeyValue::ListRangeResult, TEvKeyValue::TEvReadRange>;
+ using TBase::TBase;
+ using TBase::Handle;
+ STFUNC(StateFunc) {
+ switch (ev->GetTypeRewrite()) {
+ default:
+ return TBase::StateFunc(ev);
+ }
+ }
+ bool ValidateRequest(Ydb::StatusIds::StatusCode& /*status*/, NYql::TIssues& /*issues*/) override {
+ return true;
+ }
+ NACLib::EAccessRights GetRequiredAccessRights() const {
+ return NACLib::SelectRow;
+ }
+};
+
+class TGetStorageChannelStatusRequest
+ : public TKeyValueRequestGrpc<TGetStorageChannelStatusRequest, TEvGetStorageChannelStatusKeyValueRequest,
+ Ydb::KeyValue::GetStorageChannelStatusResult, TEvKeyValue::TEvGetStorageChannelStatus> {
+public:
+ using TBase = TKeyValueRequestGrpc<TGetStorageChannelStatusRequest, TEvGetStorageChannelStatusKeyValueRequest,
+ Ydb::KeyValue::GetStorageChannelStatusResult, TEvKeyValue::TEvGetStorageChannelStatus>;
+ using TBase::TBase;
+ using TBase::Handle;
+ STFUNC(StateFunc) {
+ switch (ev->GetTypeRewrite()) {
+ default:
+ return TBase::StateFunc(ev);
+ }
+ }
+ bool ValidateRequest(Ydb::StatusIds::StatusCode& /*status*/, NYql::TIssues& /*issues*/) override {
+ return true;
+ }
+ NACLib::EAccessRights GetRequiredAccessRights() const {
+ return NACLib::DescribeSchema;
+ }
+};
+
+}
+
+
+void DoCreateVolumeKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TCreateVolumeRequest(p.release()));
+}
+
+void DoDropVolumeKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TDropVolumeRequest(p.release()));
+}
+
+void DoAlterVolumeKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TAlterVolumeRequest(p.release()));
+}
+
+void DoDescribeVolumeKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TDescribeVolumeRequest(p.release()));
+}
+
+void DoListLocalPartitionsKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TListLocalPartitionsRequest(p.release()));
+}
+
+void DoAcquireLockKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TAcquireLockRequest(p.release()));
+}
+
+void DoExecuteTransactionKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TExecuteTransactionRequest(p.release()));
+}
+
+void DoReadKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TReadRequest(p.release()));
+}
+
+void DoReadRangeKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TReadRangeRequest(p.release()));
+}
+
+void DoListRangeKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TListRangeRequest(p.release()));
+}
+
+void DoGetStorageChannelStatusKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TGetStorageChannelStatusRequest(p.release()));
+}
+
+} // namespace NKikimr::NGRpcService
diff --git a/ydb/core/grpc_services/service_keyvalue.h b/ydb/core/grpc_services/service_keyvalue.h
new file mode 100644
index 0000000000..9bbb430178
--- /dev/null
+++ b/ydb/core/grpc_services/service_keyvalue.h
@@ -0,0 +1,23 @@
+#pragma once
+
+#include <memory>
+
+namespace NKikimr::NGRpcService {
+
+ class IRequestOpCtx;
+ class IFacilityProvider;
+
+ void DoCreateVolumeKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+ void DoDropVolumeKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+ void DoAlterVolumeKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+ void DoDescribeVolumeKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+ void DoListLocalPartitionsKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+
+ void DoAcquireLockKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+ void DoExecuteTransactionKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+ void DoReadKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+ void DoReadRangeKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+ void DoListRangeKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+ void DoGetStorageChannelStatusKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+
+} // NKikimr::NGRpcService
diff --git a/ydb/core/grpc_services/ya.make b/ydb/core/grpc_services/ya.make
index 1921bf675a..f7265ec157 100644
--- a/ydb/core/grpc_services/ya.make
+++ b/ydb/core/grpc_services/ya.make
@@ -46,6 +46,7 @@ SRCS(
rpc_import.cpp
rpc_import_data.cpp
rpc_keep_alive.cpp
+ rpc_keyvalue.cpp
rpc_kh_describe.cpp
rpc_kh_snapshots.cpp
rpc_kqp_base.cpp
diff --git a/ydb/services/CMakeLists.txt b/ydb/services/CMakeLists.txt
index 4ec40bc8a3..2073d2df19 100644
--- a/ydb/services/CMakeLists.txt
+++ b/ydb/services/CMakeLists.txt
@@ -15,6 +15,7 @@ add_subdirectory(dynamic_config)
add_subdirectory(ext_index)
add_subdirectory(fq)
add_subdirectory(kesus)
+add_subdirectory(keyvalue)
add_subdirectory(lib)
add_subdirectory(local_discovery)
add_subdirectory(maintenance)
diff --git a/ydb/services/keyvalue/CMakeLists.darwin-x86_64.txt b/ydb/services/keyvalue/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..522f6f9097
--- /dev/null
+++ b/ydb/services/keyvalue/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,24 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(ut)
+
+add_library(ydb-services-keyvalue)
+target_link_libraries(ydb-services-keyvalue PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ api-grpc
+ cpp-grpc-server
+ ydb-core-grpc_services
+ core-grpc_services-base
+ core-kesus-tablet
+ ydb-core-keyvalue
+)
+target_sources(ydb-services-keyvalue PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/keyvalue/grpc_service.cpp
+)
diff --git a/ydb/services/keyvalue/CMakeLists.linux-aarch64.txt b/ydb/services/keyvalue/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..fc41c03b82
--- /dev/null
+++ b/ydb/services/keyvalue/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,25 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(ut)
+
+add_library(ydb-services-keyvalue)
+target_link_libraries(ydb-services-keyvalue PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ api-grpc
+ cpp-grpc-server
+ ydb-core-grpc_services
+ core-grpc_services-base
+ core-kesus-tablet
+ ydb-core-keyvalue
+)
+target_sources(ydb-services-keyvalue PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/keyvalue/grpc_service.cpp
+)
diff --git a/ydb/services/keyvalue/CMakeLists.linux-x86_64.txt b/ydb/services/keyvalue/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..fc41c03b82
--- /dev/null
+++ b/ydb/services/keyvalue/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,25 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(ut)
+
+add_library(ydb-services-keyvalue)
+target_link_libraries(ydb-services-keyvalue PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ api-grpc
+ cpp-grpc-server
+ ydb-core-grpc_services
+ core-grpc_services-base
+ core-kesus-tablet
+ ydb-core-keyvalue
+)
+target_sources(ydb-services-keyvalue PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/keyvalue/grpc_service.cpp
+)
diff --git a/ydb/services/keyvalue/CMakeLists.txt b/ydb/services/keyvalue/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/services/keyvalue/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/services/keyvalue/CMakeLists.windows-x86_64.txt b/ydb/services/keyvalue/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..522f6f9097
--- /dev/null
+++ b/ydb/services/keyvalue/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,24 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(ut)
+
+add_library(ydb-services-keyvalue)
+target_link_libraries(ydb-services-keyvalue PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ api-grpc
+ cpp-grpc-server
+ ydb-core-grpc_services
+ core-grpc_services-base
+ core-kesus-tablet
+ ydb-core-keyvalue
+)
+target_sources(ydb-services-keyvalue PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/keyvalue/grpc_service.cpp
+)
diff --git a/ydb/services/keyvalue/grpc_service.cpp b/ydb/services/keyvalue/grpc_service.cpp
new file mode 100644
index 0000000000..8e3b663027
--- /dev/null
+++ b/ydb/services/keyvalue/grpc_service.cpp
@@ -0,0 +1,81 @@
+#include "grpc_service.h"
+
+#include <ydb/core/grpc_services/grpc_helper.h>
+#include <ydb/core/grpc_services/base/base.h>
+#include <ydb/core/grpc_services/service_keyvalue.h>
+
+
+namespace NKikimr::NGRpcService {
+
+TKeyValueGRpcService::TKeyValueGRpcService(NActors::TActorSystem* actorSystem, TIntrusivePtr<NMonitoring::TDynamicCounters> counters, NActors::TActorId grpcRequestProxyId)
+ : ActorSystem(actorSystem)
+ , Counters(std::move(counters))
+ , GRpcRequestProxyId(grpcRequestProxyId)
+{
+}
+
+TKeyValueGRpcService::~TKeyValueGRpcService() = default;
+
+void TKeyValueGRpcService::InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) {
+ CQ = cq;
+ SetupIncomingRequests(std::move(logger));
+}
+
+void TKeyValueGRpcService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) {
+ Limiter = limiter;
+}
+
+bool TKeyValueGRpcService::IncRequest() {
+ return Limiter->Inc();
+}
+
+void TKeyValueGRpcService::DecRequest() {
+ Limiter->Dec();
+}
+
+void TKeyValueGRpcService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
+ auto getCounterBlock = NGRpcService::CreateCounterCb(Counters, ActorSystem);
+
+#ifdef SETUP_METHOD
+#error SETUP_METHOD macro collision
+#endif
+
+#define SETUP_METHOD(methodName, method, rlMode) \
+ MakeIntrusive<NGRpcService::TGRpcRequest< \
+ Ydb::KeyValue::Y_CAT(methodName, Request), \
+ Ydb::KeyValue::Y_CAT(methodName, Response), \
+ TKeyValueGRpcService>> \
+ ( \
+ this, \
+ &Service_, \
+ CQ, \
+ [this](NGrpc::IRequestContextBase* reqCtx) { \
+ NGRpcService::ReportGrpcReqToMon(*ActorSystem, reqCtx->GetPeer()); \
+ ActorSystem->Send(GRpcRequestProxyId, new TGrpcRequestOperationCall< \
+ Ydb::KeyValue::Y_CAT(methodName, Request), \
+ Ydb::KeyValue::Y_CAT(methodName, Response)>(reqCtx, &method, \
+ TRequestAuxSettings{rlMode, nullptr})); \
+ }, \
+ &Ydb::KeyValue::V1::KeyValueService::AsyncService::Y_CAT(Request, methodName), \
+ "KeyValue/" Y_STRINGIZE(methodName), \
+ logger, \
+ getCounterBlock("keyvalue", Y_STRINGIZE(methodName)) \
+ )->Run()
+
+ SETUP_METHOD(CreateVolume, DoCreateVolumeKeyValue, TRateLimiterMode::Rps);
+ SETUP_METHOD(DropVolume, DoDropVolumeKeyValue, TRateLimiterMode::Rps);
+ SETUP_METHOD(AlterVolume, DoAlterVolumeKeyValue, TRateLimiterMode::Rps);
+ SETUP_METHOD(DescribeVolume, DoDescribeVolumeKeyValue, TRateLimiterMode::Rps);
+ SETUP_METHOD(ListLocalPartitions, DoListLocalPartitionsKeyValue, TRateLimiterMode::Rps);
+
+ SETUP_METHOD(AcquireLock, DoAcquireLockKeyValue, TRateLimiterMode::Rps);
+ SETUP_METHOD(ExecuteTransaction, DoExecuteTransactionKeyValue, TRateLimiterMode::Rps);
+ SETUP_METHOD(Read, DoReadKeyValue, TRateLimiterMode::Rps);
+ SETUP_METHOD(ReadRange, DoReadRangeKeyValue, TRateLimiterMode::Rps);
+ SETUP_METHOD(ListRange, DoListRangeKeyValue, TRateLimiterMode::Rps);
+ SETUP_METHOD(GetStorageChannelStatus, DoGetStorageChannelStatusKeyValue, TRateLimiterMode::Rps);
+
+#undef SETUP_METHOD
+}
+
+} // namespace NKikimr::NGRpcService
diff --git a/ydb/services/keyvalue/grpc_service.h b/ydb/services/keyvalue/grpc_service.h
new file mode 100644
index 0000000000..18161acad1
--- /dev/null
+++ b/ydb/services/keyvalue/grpc_service.h
@@ -0,0 +1,37 @@
+#pragma once
+
+#include <ydb/public/api/grpc/ydb_keyvalue_v1.grpc.pb.h>
+
+#include <library/cpp/grpc/server/grpc_server.h>
+#include <library/cpp/actors/core/actorsystem.h>
+
+
+namespace NKikimr::NGRpcService {
+
+class TKeyValueGRpcService
+ : public NGrpc::TGrpcServiceBase<Ydb::KeyValue::V1::KeyValueService>
+{
+public:
+ TKeyValueGRpcService(NActors::TActorSystem* actorSystem, TIntrusivePtr<NMonitoring::TDynamicCounters> counters,
+ NActors::TActorId grpcRequestProxyId);
+ ~TKeyValueGRpcService();
+
+ void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override;
+ void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override;
+
+ bool IncRequest();
+ void DecRequest();
+
+private:
+ void SetupIncomingRequests(NGrpc::TLoggerPtr logger);
+
+private:
+ NActors::TActorSystem* ActorSystem = nullptr;
+ TIntrusivePtr<NMonitoring::TDynamicCounters> Counters;
+ NActors::TActorId GRpcRequestProxyId;
+
+ grpc::ServerCompletionQueue* CQ = nullptr;
+ NGrpc::TGlobalLimiter* Limiter = nullptr;
+};
+
+} // namespace NKikimr::NGRpcService
diff --git a/ydb/services/keyvalue/grpc_service_ut.cpp b/ydb/services/keyvalue/grpc_service_ut.cpp
new file mode 100644
index 0000000000..131c3ac363
--- /dev/null
+++ b/ydb/services/keyvalue/grpc_service_ut.cpp
@@ -0,0 +1,838 @@
+#include "grpc_service.h"
+
+#include <ydb/core/keyvalue/keyvalue.h>
+#include <ydb/core/keyvalue/keyvalue_events.h>
+#include <ydb/core/protos/config.pb.h>
+#include <ydb/core/testlib/basics/appdata.h>
+#include <ydb/core/testlib/test_client.h>
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
+
+#include <ydb/public/api/grpc/ydb_scheme_v1.grpc.pb.h>
+
+#include <ydb/public/sdk/cpp/client/resources/ydb_resources.h>
+
+#include <library/cpp/grpc/client/grpc_client_low.h>
+#include <library/cpp/testing/unittest/registar.h>
+#include <library/cpp/testing/unittest/tests_data.h>
+#include <library/cpp/logger/backend.h>
+
+#include <grpc++/client_context.h>
+#include <grpc++/create_channel.h>
+
+#include <util/string/builder.h>
+
+
+TString PrintIssue(const ::google::protobuf::RepeatedPtrField< ::Ydb::Issue::IssueMessage> &issues) {
+ TStringBuilder msg;
+ msg << '{';
+ for (auto &issue : issues) {
+ msg << " issue# " << issue.message();
+ }
+ msg << " }";
+ return msg;
+}
+
+
+#define UNIT_ASSERT_CHECK_STATUS(got, exp) \
+ UNIT_ASSERT_C(got.status() == exp, "exp# " << Ydb::StatusIds::StatusCode_Name(exp) \
+ << " got# " << Ydb::StatusIds::StatusCode_Name(got.status()) << " issues# " << PrintIssue(got.issues())) \
+// UNIT_ASSERT_CHECK_STATUS
+
+
+namespace NKikimr::NGRpcService {
+
+
+struct TKikimrTestSettings {
+ static constexpr bool SSL = false;
+ static constexpr bool AUTH = false;
+ static constexpr bool PrecreatePools = true;
+ static constexpr bool EnableSystemViews = true;
+};
+
+struct TKikimrTestWithAuth : TKikimrTestSettings {
+ static constexpr bool AUTH = true;
+};
+
+struct TKikimrTestWithAuthAndSsl : TKikimrTestWithAuth {
+ static constexpr bool SSL = true;
+};
+
+struct TKikimrTestNoSystemViews : TKikimrTestSettings {
+ static constexpr bool EnableSystemViews = false;
+};
+
+template <typename TestSettings = TKikimrTestSettings>
+class TBasicKikimrWithGrpcAndRootSchema {
+public:
+ TBasicKikimrWithGrpcAndRootSchema(
+ NKikimrConfig::TAppConfig appConfig = {},
+ TAutoPtr<TLogBackend> logBackend = {})
+ {
+ ui16 port = PortManager.GetPort(2134);
+ ui16 grpc = PortManager.GetPort(2135);
+ ServerSettings = new Tests::TServerSettings(port);
+ ServerSettings->SetGrpcPort(grpc);
+ ServerSettings->SetLogBackend(logBackend);
+ ServerSettings->SetDomainName("Root");
+ ServerSettings->SetDynamicNodeCount(1);
+ if (TestSettings::PrecreatePools) {
+ ServerSettings->AddStoragePool("ssd");
+ ServerSettings->AddStoragePool("hdd");
+ ServerSettings->AddStoragePool("hdd1");
+ ServerSettings->AddStoragePool("hdd2");
+ } else {
+ ServerSettings->AddStoragePoolType("ssd");
+ ServerSettings->AddStoragePoolType("hdd");
+ ServerSettings->AddStoragePoolType("hdd1");
+ ServerSettings->AddStoragePoolType("hdd2");
+ }
+ ServerSettings->Formats = new TFormatFactory;
+ ServerSettings->FeatureFlags = appConfig.GetFeatureFlags();
+ ServerSettings->RegisterGrpcService<NKikimr::NGRpcService::TKeyValueGRpcService>("keyvalue");
+
+ Server_.Reset(new Tests::TServer(*ServerSettings));
+ Tenants_.Reset(new Tests::TTenants(Server_));
+
+ //Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NActors::NLog::PRI_DEBUG);
+ //Server_->GetRuntime()->SetLogPriority(NKikimrServices::SCHEME_BOARD_REPLICA, NActors::NLog::PRI_DEBUG);
+ //Server_->GetRuntime()->SetLogPriority(NKikimrServices::SCHEME_BOARD_SUBSCRIBER, NActors::NLog::PRI_TRACE);
+ //Server_->GetRuntime()->SetLogPriority(NKikimrServices::SCHEME_BOARD_POPULATOR, NActors::NLog::PRI_DEBUG);
+ //Server_->GetRuntime()->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_DEBUG);
+ //Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_PROXY, NActors::NLog::PRI_DEBUG);
+ Server_->GetRuntime()->SetLogPriority(NKikimrServices::KEYVALUE, NActors::NLog::PRI_DEBUG);
+ //Server_->GetRuntime()->SetLogPriority(NKikimrServices::BOOTSTRAPPER, NActors::NLog::PRI_DEBUG);
+ //Server_->GetRuntime()->SetLogPriority(NKikimrServices::STATESTORAGE, NActors::NLog::PRI_DEBUG);
+ //Server_->GetRuntime()->SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NActors::NLog::PRI_DEBUG);
+ //Server_->GetRuntime()->SetLogPriority(NKikimrServices::SAUSAGE_BIO, NActors::NLog::PRI_DEBUG);
+ //Server_->GetRuntime()->SetLogPriority(NKikimrServices::TABLET_FLATBOOT, NActors::NLog::PRI_DEBUG);
+ //Server_->GetRuntime()->SetLogPriority(NKikimrServices::TABLET_OPS_HOST, NActors::NLog::PRI_DEBUG);
+ //Server_->GetRuntime()->SetLogPriority(NKikimrServices::TABLET_SAUSAGECACHE, NActors::NLog::PRI_DEBUG);
+ //Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_OLAPSHARD, NActors::NLog::PRI_DEBUG);
+ //Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG);
+
+ NGrpc::TServerOptions grpcOption;
+ if (TestSettings::AUTH) {
+ grpcOption.SetUseAuth(true);
+ }
+ grpcOption.SetPort(grpc);
+ Server_->EnableGRpc(grpcOption);
+
+ Tests::TClient annoyingClient(*ServerSettings);
+ if (ServerSettings->AppConfig.GetDomainsConfig().GetSecurityConfig().GetEnforceUserTokenRequirement()) {
+ annoyingClient.SetSecurityToken("root@builtin");
+ }
+ annoyingClient.InitRootScheme("Root");
+ GRpcPort_ = grpc;
+ }
+
+ ui16 GetPort() {
+ return GRpcPort_;
+ }
+
+ TPortManager& GetPortManager() {
+ return PortManager;
+ }
+
+ void ResetSchemeCache(TString path, ui32 nodeIndex = 0) {
+ TTestActorRuntime* runtime = Server_->GetRuntime();
+ Tests::TClient annoyingClient(*ServerSettings);
+ annoyingClient.RefreshPathCache(runtime, path, nodeIndex);
+ }
+
+ TTestActorRuntime* GetRuntime() {
+ return Server_->GetRuntime();
+ }
+
+ Tests::TServer& GetServer() {
+ return *Server_;
+ }
+
+ Tests::TServerSettings::TPtr ServerSettings;
+ Tests::TServer::TPtr Server_;
+ THolder<Tests::TTenants> Tenants_;
+private:
+ TPortManager PortManager;
+ ui16 GRpcPort_;
+};
+
+using TKikimrWithGrpcAndRootSchema = TBasicKikimrWithGrpcAndRootSchema<TKikimrTestSettings>;
+
+Y_UNIT_TEST_SUITE(KeyValueGRPCService) {
+
+ void InitTablet(TKikimrWithGrpcAndRootSchema &server, ui64 tabletId) {
+ server.GetRuntime()->SetScheduledLimit(100);
+ CreateTestBootstrapper(*server.GetRuntime(),
+ CreateTestTabletInfo(tabletId, TTabletTypes::KeyValue),
+ &CreateKeyValueFlat);
+ NanoSleep(3'000'000'000);
+ }
+
+ void CmdWrite(ui64 tabletId, const TDeque<TString> &keys, const TDeque<TString> &values, TKikimrWithGrpcAndRootSchema &server)
+ {
+ Y_VERIFY(keys.size() == values.size());
+ TAutoPtr<IEventHandle> handle;
+ TEvKeyValue::TEvResponse *result;
+ THolder<TEvKeyValue::TEvRequest> request;
+ TActorId edgeActor = server.GetRuntime()->AllocateEdgeActor();
+ for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) {
+ try {
+ server.GetRuntime()->ResetScheduledCount();
+ request.Reset(new TEvKeyValue::TEvRequest);
+ for (ui64 idx = 0; idx < keys.size(); ++idx) {
+ auto write = request->Record.AddCmdWrite();
+ write->SetKey(keys[idx]);
+ write->SetValue(values[idx]);
+ write->SetStorageChannel(NKikimrClient::TKeyValueRequest::MAIN);
+ write->SetPriority(NKikimrClient::TKeyValueRequest::REALTIME);
+ }
+ server.GetRuntime()->SendToPipe(tabletId, edgeActor, request.Release(), 0, GetPipeConfigWithRetries());
+ result = server.GetRuntime()->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle);
+ UNIT_ASSERT(result);
+ UNIT_ASSERT(result->Record.HasStatus());
+ UNIT_ASSERT_EQUAL(result->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK);
+ UNIT_ASSERT_VALUES_EQUAL(result->Record.WriteResultSize(), values.size());
+ for (ui64 idx = 0; idx < values.size(); ++idx) {
+ const auto &writeResult = result->Record.GetWriteResult(idx);
+ UNIT_ASSERT(writeResult.HasStatus());
+ UNIT_ASSERT_EQUAL(writeResult.GetStatus(), NKikimrProto::OK);
+ UNIT_ASSERT(writeResult.HasStatusFlags());
+ if (values[idx].size()) {
+ UNIT_ASSERT(writeResult.GetStatusFlags() & ui32(NKikimrBlobStorage::StatusIsValid));
+ }
+ }
+ retriesLeft = 0;
+ } catch (NActors::TSchedulingLimitReachedException) {
+ UNIT_ASSERT(retriesLeft == 2);
+ }
+ }
+ }
+
+ template <typename TCtx>
+ void AdjustCtxForDB(TCtx &ctx) {
+ ctx.AddMetadata(NYdb::YDB_AUTH_TICKET_HEADER, "root@builtin");
+ }
+
+ void MakeDirectory(auto &channel, const TString &path) {
+ std::unique_ptr<Ydb::Scheme::V1::SchemeService::Stub> stub;
+ stub = Ydb::Scheme::V1::SchemeService::NewStub(channel);
+
+ Ydb::Scheme::MakeDirectoryRequest makeDirectoryRequest;
+ makeDirectoryRequest.set_path(path);
+ Ydb::Scheme::MakeDirectoryResponse makeDirectoryResponse;
+ grpc::ClientContext makeDirectoryCtx;
+ AdjustCtxForDB(makeDirectoryCtx);
+ stub->MakeDirectory(&makeDirectoryCtx, makeDirectoryRequest, &makeDirectoryResponse);
+ UNIT_ASSERT_CHECK_STATUS(makeDirectoryResponse.operation(), Ydb::StatusIds::SUCCESS);
+ }
+
+ void MakeTable(auto &channel, const TString &path) {
+ std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> stub;
+ stub = Ydb::KeyValue::V1::KeyValueService::NewStub(channel);
+
+ Ydb::KeyValue::CreateVolumeRequest createVolumeRequest;
+ createVolumeRequest.set_path(path);
+ createVolumeRequest.set_partition_count(1);
+ auto *storage_config = createVolumeRequest.mutable_storage_config();
+ storage_config->add_channel()->set_media("ssd");
+ storage_config->add_channel()->set_media("ssd");
+ storage_config->add_channel()->set_media("ssd");
+
+ Ydb::KeyValue::CreateVolumeResponse createVolumeResponse;
+ Ydb::KeyValue::CreateVolumeResult createVolumeResult;
+
+ grpc::ClientContext createVolumeCtx;
+ AdjustCtxForDB(createVolumeCtx);
+ stub->CreateVolume(&createVolumeCtx, createVolumeRequest, &createVolumeResponse);
+ UNIT_ASSERT_CHECK_STATUS(createVolumeResponse.operation(), Ydb::StatusIds::SUCCESS);
+ createVolumeResponse.operation().result().UnpackTo(&createVolumeResult);
+ }
+
+ void AlterVolume(auto &channel, const TString &path, ui32 partition_count = 1) {
+ std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> stub;
+ stub = Ydb::KeyValue::V1::KeyValueService::NewStub(channel);
+
+ Ydb::KeyValue::AlterVolumeRequest alterVolumeRequest;
+ alterVolumeRequest.set_path(path);
+ alterVolumeRequest.set_alter_partition_count(partition_count);
+
+ Ydb::KeyValue::AlterVolumeResponse alterVolumeResponse;
+ Ydb::KeyValue::AlterVolumeResult alterVolumeResult;
+
+ grpc::ClientContext alterVolumeCtx;
+ AdjustCtxForDB(alterVolumeCtx);
+ stub->AlterVolume(&alterVolumeCtx, alterVolumeRequest, &alterVolumeResponse);
+ UNIT_ASSERT_CHECK_STATUS(alterVolumeResponse.operation(), Ydb::StatusIds::SUCCESS);
+ alterVolumeResponse.operation().result().UnpackTo(&alterVolumeResult);
+ }
+
+ void DropVolume(auto &channel, const TString &path) {
+ std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> stub;
+ stub = Ydb::KeyValue::V1::KeyValueService::NewStub(channel);
+
+ Ydb::KeyValue::DropVolumeRequest dropVolumeRequest;
+ dropVolumeRequest.set_path(path);
+
+ Ydb::KeyValue::DropVolumeResponse dropVolumeResponse;
+ Ydb::KeyValue::DropVolumeResult dropVolumeResult;
+
+ grpc::ClientContext dropVolumeCtx;
+ AdjustCtxForDB(dropVolumeCtx);
+ stub->DropVolume(&dropVolumeCtx, dropVolumeRequest, &dropVolumeResponse);
+ UNIT_ASSERT_CHECK_STATUS(dropVolumeResponse.operation(), Ydb::StatusIds::SUCCESS);
+ dropVolumeResponse.operation().result().UnpackTo(&dropVolumeResult);
+ }
+
+ Ydb::KeyValue::DescribeVolumeResult DescribeVolume(auto &channel, const TString &path) {
+ std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> stub;
+ stub = Ydb::KeyValue::V1::KeyValueService::NewStub(channel);
+
+ Ydb::KeyValue::DescribeVolumeRequest describeVolumeRequest;
+ describeVolumeRequest.set_path(path);
+
+ Ydb::KeyValue::DescribeVolumeResponse describeVolumeResponse;
+ Ydb::KeyValue::DescribeVolumeResult describeVolumeResult;
+
+ grpc::ClientContext describeVolumeCtx;
+ AdjustCtxForDB(describeVolumeCtx);
+ stub->DescribeVolume(&describeVolumeCtx, describeVolumeRequest, &describeVolumeResponse);
+ UNIT_ASSERT_CHECK_STATUS(describeVolumeResponse.operation(), Ydb::StatusIds::SUCCESS);
+ describeVolumeResponse.operation().result().UnpackTo(&describeVolumeResult);
+ return describeVolumeResult;
+ }
+
+
+ Ydb::Scheme::ListDirectoryResult ListDirectory(auto &channel, const TString &path) {
+ std::unique_ptr<Ydb::Scheme::V1::SchemeService::Stub> stub;
+ stub = Ydb::Scheme::V1::SchemeService::NewStub(channel);
+ Ydb::Scheme::ListDirectoryRequest listDirectoryRequest;
+ listDirectoryRequest.set_path(path);
+
+ Ydb::Scheme::ListDirectoryResult listDirectoryResult;
+ Ydb::Scheme::ListDirectoryResponse listDirectoryResponse;
+
+ grpc::ClientContext listDirectoryCtx;
+ AdjustCtxForDB(listDirectoryCtx);
+ stub->ListDirectory(&listDirectoryCtx, listDirectoryRequest, &listDirectoryResponse);
+
+ UNIT_ASSERT_CHECK_STATUS(listDirectoryResponse.operation(), Ydb::StatusIds::SUCCESS);
+ listDirectoryResponse.operation().result().UnpackTo(&listDirectoryResult);
+ return listDirectoryResult;
+ }
+
+ ui64 AcquireLock( const TString &path, ui64 partitionId, auto &stub) {
+ Ydb::KeyValue::AcquireLockRequest request;
+ request.set_path(path);
+ request.set_partition_id(partitionId);
+
+ Ydb::KeyValue::AcquireLockResponse response;
+ Ydb::KeyValue::AcquireLockResult result;
+
+ grpc::ClientContext ctx;
+ AdjustCtxForDB(ctx);
+ stub->AcquireLock(&ctx, request, &response);
+ UNIT_ASSERT_CHECK_STATUS(response.operation(), Ydb::StatusIds::SUCCESS);
+ response.operation().result().UnpackTo(&result);
+ return result.lock_generation();
+ }
+
+ void WaitTableCreation(TKikimrWithGrpcAndRootSchema &server, const TString &path) {
+ bool again = true;
+ for (ui32 i = 0; i < 10 && again; ++i) {
+ Cerr << "Wait iteration# " << i << Endl;
+ auto req = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
+ auto& entry = req->ResultSet.emplace_back();
+ entry.Path = SplitPath(path);
+ entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByPath;
+ entry.ShowPrivatePath = true;
+ entry.SyncVersion = false;
+ req->UserToken = new NACLib::TUserToken("root@builtin", {});
+ UNIT_ASSERT(req->UserToken);
+ TActorId edgeActor = server.GetRuntime()->AllocateEdgeActor();
+ auto ev = new TEvTxProxySchemeCache::TEvNavigateKeySet(req.Release());
+ UNIT_ASSERT(ev->Request->UserToken);
+ auto schemeCache = MakeSchemeCacheID();
+ server.GetRuntime()->Send(new IEventHandle(schemeCache, edgeActor, ev));
+
+ TAutoPtr<IEventHandle> handle;
+ auto *result = server.GetRuntime()->GrabEdgeEvent<TEvTxProxySchemeCache::TEvNavigateKeySetResult>(handle);
+ UNIT_ASSERT_VALUES_EQUAL(result->Request->ResultSet.size(), 1);
+ again = result->Request->ResultSet[0].Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok;
+ }
+ }
+
+ void MakeSimpleTest(const TString &tablePath,
+ std::function<void(const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub>&)> func)
+ {
+ TKikimrWithGrpcAndRootSchema server;
+ ui16 grpc = server.GetPort();
+ TString location = TStringBuilder() << "localhost:" << grpc;
+
+ ////////////////////////////////////////////////////////////////////////
+
+ std::shared_ptr<grpc::Channel> channel;
+ std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> stub;
+ channel = grpc::CreateChannel("localhost:" + ToString(grpc), grpc::InsecureChannelCredentials());
+ MakeDirectory(channel, "/Root/mydb");
+ MakeTable(channel, tablePath);
+ auto pr = SplitPath(tablePath);
+ Ydb::Scheme::ListDirectoryResult listDirectoryResult = ListDirectory(channel, "/Root/mydb");
+ UNIT_ASSERT_VALUES_EQUAL(listDirectoryResult.self().name(), "mydb");
+ UNIT_ASSERT_VALUES_EQUAL(listDirectoryResult.children(0).name(), pr.back());
+
+ WaitTableCreation(server, tablePath);
+ stub = Ydb::KeyValue::V1::KeyValueService::NewStub(channel);
+ func(stub);
+ }
+
+ Y_UNIT_TEST(SimpleAcquireLock) {
+ TString tablePath = "/Root/mydb/kvtable";
+ MakeSimpleTest(tablePath, [tablePath](const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> &stub){
+ Ydb::KeyValue::AcquireLockRequest request;
+ request.set_path(tablePath);
+ request.set_partition_id(0);
+ Ydb::KeyValue::AcquireLockResponse response;
+ Ydb::KeyValue::AcquireLockResult result;
+
+ grpc::ClientContext ctx1;
+ AdjustCtxForDB(ctx1);
+ stub->AcquireLock(&ctx1, request, &response);
+ UNIT_ASSERT_CHECK_STATUS(response.operation(), Ydb::StatusIds::SUCCESS);
+ response.operation().result().UnpackTo(&result);
+ UNIT_ASSERT(result.lock_generation() == 1);
+
+ grpc::ClientContext ctx2;
+ AdjustCtxForDB(ctx2);
+ stub->AcquireLock(&ctx2, request, &response);
+ UNIT_ASSERT_CHECK_STATUS(response.operation(), Ydb::StatusIds::SUCCESS);
+ response.operation().result().UnpackTo(&result);
+ UNIT_ASSERT(result.lock_generation() == 2);
+ });
+ }
+
+ Y_UNIT_TEST(SimpleExecuteTransaction) {
+ TString tablePath = "/Root/mydb/kvtable";
+ MakeSimpleTest(tablePath, [tablePath](const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> &stub){
+ Ydb::KeyValue::ExecuteTransactionRequest request;
+ request.set_path(tablePath);
+ request.set_partition_id(0);
+ Ydb::KeyValue::ExecuteTransactionResponse response;
+
+ grpc::ClientContext ctx;
+ AdjustCtxForDB(ctx);
+ stub->ExecuteTransaction(&ctx, request, &response);
+ UNIT_ASSERT_CHECK_STATUS(response.operation(), Ydb::StatusIds::SUCCESS);
+ });
+ }
+
+ Y_UNIT_TEST(SimpleExecuteTransactionWithWrongGeneration) {
+ TString tablePath = "/Root/mydb/kvtable";
+ MakeSimpleTest(tablePath, [tablePath](const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> &stub){
+ Ydb::KeyValue::ExecuteTransactionRequest request;
+ request.set_path(tablePath);
+ request.set_partition_id(0);
+ request.set_lock_generation(42);
+ Ydb::KeyValue::ExecuteTransactionResponse response;
+
+ grpc::ClientContext ctx;
+ AdjustCtxForDB(ctx);
+ stub->ExecuteTransaction(&ctx, request, &response);
+ UNIT_ASSERT_CHECK_STATUS(response.operation(), Ydb::StatusIds::PRECONDITION_FAILED);
+ });
+ }
+
+ Ydb::KeyValue::ExecuteTransactionResult Write(const TString &path, ui64 partitionId, const TString &key, const TString &value, ui64 storageChannel,
+ const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> &stub)
+ {
+ Ydb::KeyValue::ExecuteTransactionRequest writeRequest;
+ writeRequest.set_path(path);
+ writeRequest.set_partition_id(partitionId);
+ auto *cmd = writeRequest.add_commands();
+ auto *write = cmd->mutable_write();
+ write->set_key(key);
+ write->set_value(value);
+ write->set_storage_channel(storageChannel);
+ Ydb::KeyValue::ExecuteTransactionResponse writeResponse;
+
+ grpc::ClientContext writeCtx;
+ AdjustCtxForDB(writeCtx);
+ stub->ExecuteTransaction(&writeCtx, writeRequest, &writeResponse);
+ UNIT_ASSERT_CHECK_STATUS(writeResponse.operation(), Ydb::StatusIds::SUCCESS);
+ Ydb::KeyValue::ExecuteTransactionResult writeResult;
+ writeResponse.operation().result().UnpackTo(&writeResult);
+ return writeResult;
+ }
+
+ void Rename(const TString &path, ui64 partitionId, const TString &oldKey, const TString &newKey,
+ const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> &stub)
+ {
+ Ydb::KeyValue::ExecuteTransactionRequest request;
+ request.set_path(path);
+ request.set_partition_id(partitionId);
+ auto *cmd = request.add_commands();
+ auto *rename = cmd->mutable_rename();
+ rename->set_old_key(oldKey);
+ rename->set_new_key(newKey);
+ Ydb::KeyValue::ExecuteTransactionResponse response;
+
+ grpc::ClientContext ctx;
+ AdjustCtxForDB(ctx);
+ stub->ExecuteTransaction(&ctx, request, &response);
+ UNIT_ASSERT_CHECK_STATUS(response.operation(), Ydb::StatusIds::SUCCESS);
+ }
+
+
+ Y_UNIT_TEST(SimpleRenameUnexistedKey) {
+ TString tablePath = "/Root/mydb/kvtable";
+ MakeSimpleTest(tablePath, [tablePath](const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> &stub){
+ Ydb::KeyValue::ExecuteTransactionRequest request;
+ request.set_path(tablePath);
+ request.set_partition_id(0);
+ auto *cmd = request.add_commands();
+ auto *rename = cmd->mutable_rename();
+ rename->set_old_key("key1");
+ rename->set_new_key("key2");
+ Ydb::KeyValue::ExecuteTransactionResponse response;
+
+ grpc::ClientContext ctx;
+ AdjustCtxForDB(ctx);
+ stub->ExecuteTransaction(&ctx, request, &response);
+ UNIT_ASSERT_CHECK_STATUS(response.operation(), Ydb::StatusIds::NOT_FOUND);
+ });
+ }
+
+ Y_UNIT_TEST(SimpleConcatUnexistedKey) {
+ TString tablePath = "/Root/mydb/kvtable";
+ MakeSimpleTest(tablePath, [tablePath](const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> &stub){
+ Ydb::KeyValue::ExecuteTransactionRequest request;
+ request.set_path(tablePath);
+ request.set_partition_id(0);
+ auto *cmd = request.add_commands();
+ auto *concat = cmd->mutable_concat();
+ concat->add_input_keys("key1");
+ concat->add_input_keys("key2");
+ concat->set_output_key("key3");
+ Ydb::KeyValue::ExecuteTransactionResponse response;
+
+ grpc::ClientContext ctx;
+ AdjustCtxForDB(ctx);
+ stub->ExecuteTransaction(&ctx, request, &response);
+ UNIT_ASSERT_CHECK_STATUS(response.operation(), Ydb::StatusIds::NOT_FOUND);
+ });
+ }
+
+ Y_UNIT_TEST(SimpleCopyUnexistedKey) {
+ TString tablePath = "/Root/mydb/kvtable";
+ MakeSimpleTest(tablePath, [tablePath](const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> &stub){
+ Ydb::KeyValue::ExecuteTransactionRequest request;
+ request.set_path(tablePath);
+ request.set_partition_id(0);
+ auto *cmd = request.add_commands();
+ auto *rename = cmd->mutable_copy_range();
+ auto *range = rename->mutable_range();
+ range->set_from_key_inclusive("key1");
+ range->set_to_key_inclusive("key2");
+ rename->set_prefix_to_add("A");
+ Ydb::KeyValue::ExecuteTransactionResponse response;
+
+ grpc::ClientContext ctx;
+ AdjustCtxForDB(ctx);
+ stub->ExecuteTransaction(&ctx, request, &response);
+ UNIT_ASSERT_CHECK_STATUS(response.operation(), Ydb::StatusIds::SUCCESS);
+ });
+ }
+
+ Y_UNIT_TEST(SimpleWriteRead) {
+ TString tablePath = "/Root/mydb/kvtable";
+ MakeSimpleTest(tablePath, [tablePath](const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> &stub){
+ Write(tablePath, 0, "key", "value", 0, stub);
+
+ Ydb::KeyValue::ReadRequest readRequest;
+ readRequest.set_path(tablePath);
+ readRequest.set_partition_id(0);
+ readRequest.set_key("key");
+ Ydb::KeyValue::ReadResponse readResponse;
+ Ydb::KeyValue::ReadResult readResult;
+
+ grpc::ClientContext readCtx;
+ AdjustCtxForDB(readCtx);
+ stub->Read(&readCtx, readRequest, &readResponse);
+ UNIT_ASSERT_CHECK_STATUS(readResponse.operation(), Ydb::StatusIds::SUCCESS);
+ readResponse.operation().result().UnpackTo(&readResult);
+ UNIT_ASSERT(!readResult.is_overrun());
+ UNIT_ASSERT_VALUES_EQUAL(readResult.requested_key(), "key");
+ UNIT_ASSERT_VALUES_EQUAL(readResult.value(), "value");
+ UNIT_ASSERT_VALUES_EQUAL(readResult.requested_offset(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(readResult.requested_size(), 5);
+ });
+ }
+
+ Y_UNIT_TEST(SimpleWriteReadWithIncorreectPath) {
+ TString tablePath = "/Root/mydb/kvtable";
+ MakeSimpleTest(tablePath, [tablePath](const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> &stub){
+ Write(tablePath, 0, "key", "value", 0, stub);
+
+ Ydb::KeyValue::ReadRequest readRequest;
+ readRequest.set_path("/Root/mydb/table");
+ readRequest.set_partition_id(0);
+ readRequest.set_key("key");
+ Ydb::KeyValue::ReadResponse readResponse;
+ Ydb::KeyValue::ReadResult readResult;
+
+ grpc::ClientContext readCtx;
+ AdjustCtxForDB(readCtx);
+ stub->Read(&readCtx, readRequest, &readResponse);
+ UNIT_ASSERT_CHECK_STATUS(readResponse.operation(), Ydb::StatusIds::SCHEME_ERROR);
+ });
+ }
+
+ Y_UNIT_TEST(SimpleWriteReadWithoutToken) {
+ TString tablePath = "/Root/mydb/kvtable";
+ MakeSimpleTest(tablePath, [tablePath](const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> &stub){
+ return;
+ Write(tablePath, 0, "key", "value", 0, stub);
+
+ Ydb::KeyValue::ReadRequest readRequest;
+ readRequest.set_path("/Root/mydb/kvtable");
+ readRequest.set_partition_id(0);
+ readRequest.set_key("key");
+ Ydb::KeyValue::ReadResponse readResponse;
+ Ydb::KeyValue::ReadResult readResult;
+
+ grpc::ClientContext readCtx;
+ //AdjustCtxForDB(readCtx);
+ stub->Read(&readCtx, readRequest, &readResponse);
+ UNIT_ASSERT_CHECK_STATUS(readResponse.operation(), Ydb::StatusIds::SCHEME_ERROR);
+ });
+ }
+
+ Y_UNIT_TEST(SimpleWriteReadWithoutLockGeneration1) {
+ TString tablePath = "/Root/mydb/kvtable";
+ MakeSimpleTest(tablePath, [tablePath](const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> &stub){
+ AcquireLock(tablePath, 0, stub);
+ Write(tablePath, 0, "key", "value", 0, stub);
+ Ydb::KeyValue::ReadRequest readRequest;
+ readRequest.set_path(tablePath);
+ readRequest.set_partition_id(0);
+ readRequest.set_key("key");
+ Ydb::KeyValue::ReadResponse readResponse;
+ Ydb::KeyValue::ReadResult readResult;
+
+ grpc::ClientContext readCtx;
+ AdjustCtxForDB(readCtx);
+ stub->Read(&readCtx, readRequest, &readResponse);
+ UNIT_ASSERT_CHECK_STATUS(readResponse.operation(), Ydb::StatusIds::SUCCESS);
+ });
+ }
+
+ Y_UNIT_TEST(SimpleWriteReadWithoutLockGeneration2) {
+ TString tablePath = "/Root/mydb/kvtable";
+ MakeSimpleTest(tablePath, [tablePath](const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> &stub){
+ Write(tablePath, 0, "key", "value", 0, stub);
+ AcquireLock(tablePath, 0, stub);
+ Ydb::KeyValue::ReadRequest readRequest;
+ readRequest.set_path(tablePath);
+ readRequest.set_partition_id(0);
+ readRequest.set_key("key");
+ Ydb::KeyValue::ReadResponse readResponse;
+ Ydb::KeyValue::ReadResult readResult;
+
+ grpc::ClientContext readCtx;
+ AdjustCtxForDB(readCtx);
+ stub->Read(&readCtx, readRequest, &readResponse);
+ UNIT_ASSERT_CHECK_STATUS(readResponse.operation(), Ydb::StatusIds::SUCCESS);
+ });
+ }
+
+ Y_UNIT_TEST(SimpleWriteReadOverrun) {
+ TString tablePath = "/Root/mydb/kvtable";
+ MakeSimpleTest(tablePath, [tablePath](const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> &stub){
+ Write(tablePath, 0, "key", "value", 0, stub);
+
+ Ydb::KeyValue::ReadRequest readRequest;
+ readRequest.set_path(tablePath);
+ readRequest.set_partition_id(0);
+ readRequest.set_key("key");
+ ui64 limitBytes = 1 + 5 + 3 // Key id, length
+ + 1 + 5 + 1 // Value id, length, value
+ + 1 + 8 // Offset id, value
+ + 1 + 8 // Size id, value
+ + 1 + 1 // Status id, value
+ ;
+ readRequest.set_limit_bytes(limitBytes);
+ Ydb::KeyValue::ReadResponse readResponse;
+ Ydb::KeyValue::ReadResult readResult;
+
+ grpc::ClientContext readCtx;
+ AdjustCtxForDB(readCtx);
+ stub->Read(&readCtx, readRequest, &readResponse);
+ UNIT_ASSERT_CHECK_STATUS(readResponse.operation(), Ydb::StatusIds::SUCCESS);
+ readResponse.operation().result().UnpackTo(&readResult);
+ UNIT_ASSERT(readResult.is_overrun());
+ UNIT_ASSERT_VALUES_EQUAL(readResult.requested_key(), "key");
+ UNIT_ASSERT_VALUES_EQUAL(readResult.value(), "v");
+ UNIT_ASSERT_VALUES_EQUAL(readResult.requested_offset(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(readResult.requested_size(), 5);
+ });
+ }
+
+ Y_UNIT_TEST(SimpleWriteReadRange) {
+ TString tablePath = "/Root/mydb/kvtable";
+ MakeSimpleTest(tablePath, [tablePath](const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> &stub){
+ Write(tablePath, 0, "key1", "value1", 1, stub);
+ Write(tablePath, 0, "key2", "value12", 2, stub);
+
+ Ydb::KeyValue::ReadRangeRequest readRangeRequest;
+ readRangeRequest.set_path(tablePath);
+ readRangeRequest.set_partition_id(0);
+ auto *r = readRangeRequest.mutable_range();
+ r->set_from_key_inclusive("key1");
+ r->set_to_key_inclusive("key3");
+ Ydb::KeyValue::ReadRangeResponse readRangeResponse;
+ Ydb::KeyValue::ReadRangeResult readRangeResult;
+
+ grpc::ClientContext readRangeCtx;
+ AdjustCtxForDB(readRangeCtx);
+ stub->ReadRange(&readRangeCtx, readRangeRequest, &readRangeResponse);
+ UNIT_ASSERT_CHECK_STATUS(readRangeResponse.operation(), Ydb::StatusIds::SUCCESS);
+ readRangeResponse.operation().result().UnpackTo(&readRangeResult);
+
+ UNIT_ASSERT_VALUES_EQUAL(readRangeResult.pair(0).key(), "key1");
+ UNIT_ASSERT_VALUES_EQUAL(readRangeResult.pair(1).key(), "key2");
+
+ UNIT_ASSERT_VALUES_EQUAL(readRangeResult.pair(0).value(), "value1");
+ UNIT_ASSERT_VALUES_EQUAL(readRangeResult.pair(1).value(), "value12");
+
+ UNIT_ASSERT_VALUES_EQUAL(readRangeResult.pair(0).storage_channel(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(readRangeResult.pair(1).storage_channel(), 2);
+ });
+ }
+
+
+ Y_UNIT_TEST(SimpleWriteListRange) {
+ TString tablePath = "/Root/mydb/kvtable";
+ MakeSimpleTest(tablePath, [tablePath](const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> &stub){
+ Write(tablePath, 0, "key1", "value1", 1, stub);
+ Write(tablePath, 0, "key2", "value12", 2, stub);
+
+ Ydb::KeyValue::ListRangeRequest listRangeRequest;
+ listRangeRequest.set_path(tablePath);
+ listRangeRequest.set_partition_id(0);
+ auto *r = listRangeRequest.mutable_range();
+ r->set_from_key_inclusive("key1");
+ r->set_to_key_inclusive("key3");
+ Ydb::KeyValue::ListRangeResponse listRangeResponse;
+ Ydb::KeyValue::ListRangeResult listRangeResult;
+
+ grpc::ClientContext listRangeCtx;
+ AdjustCtxForDB(listRangeCtx);
+ stub->ListRange(&listRangeCtx, listRangeRequest, &listRangeResponse);
+ UNIT_ASSERT_CHECK_STATUS(listRangeResponse.operation(), Ydb::StatusIds::SUCCESS);
+ listRangeResponse.operation().result().UnpackTo(&listRangeResult);
+
+ UNIT_ASSERT_VALUES_EQUAL(listRangeResult.key(0).key(), "key1");
+ UNIT_ASSERT_VALUES_EQUAL(listRangeResult.key(1).key(), "key2");
+
+ UNIT_ASSERT_VALUES_EQUAL(listRangeResult.key(0).value_size(), 6);
+ UNIT_ASSERT_VALUES_EQUAL(listRangeResult.key(1).value_size(), 7);
+
+ UNIT_ASSERT_VALUES_EQUAL(listRangeResult.key(0).storage_channel(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(listRangeResult.key(1).storage_channel(), 2);
+ });
+ }
+
+
+ Y_UNIT_TEST(SimpleGetStorageChannelStatus) {
+ TString tablePath = "/Root/mydb/kvtable";
+ MakeSimpleTest(tablePath, [tablePath](const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> &stub){
+ Ydb::KeyValue::GetStorageChannelStatusRequest getStatusRequest;
+ getStatusRequest.set_path(tablePath);
+ getStatusRequest.set_partition_id(0);
+ getStatusRequest.add_storage_channel(1);
+ getStatusRequest.add_storage_channel(2);
+ getStatusRequest.add_storage_channel(3);
+ Ydb::KeyValue::GetStorageChannelStatusResponse getStatusResponse;
+ Ydb::KeyValue::GetStorageChannelStatusResult getStatusResult;
+
+ grpc::ClientContext getStatusCtx;
+ AdjustCtxForDB(getStatusCtx);
+ stub->GetStorageChannelStatus(&getStatusCtx, getStatusRequest, &getStatusResponse);
+ UNIT_ASSERT_CHECK_STATUS(getStatusResponse.operation(), Ydb::StatusIds::SUCCESS);
+ getStatusResponse.operation().result().UnpackTo(&getStatusResult);
+ UNIT_ASSERT_VALUES_EQUAL(getStatusResult.storage_channel_info_size(), 3);
+ });
+ }
+
+ Y_UNIT_TEST(SimpleCreateAlterDropVolume) {
+ TKikimrWithGrpcAndRootSchema server;
+ ui16 grpc = server.GetPort();
+ TString location = TStringBuilder() << "localhost:" << grpc;
+
+ std::shared_ptr<grpc::Channel> channel;
+ channel = grpc::CreateChannel("localhost:" + ToString(grpc), grpc::InsecureChannelCredentials());
+
+ TString path = "/Root/mydb/";
+ TString tablePath = "/Root/mydb/mytable";
+ MakeDirectory(channel, path);
+ MakeTable(channel, tablePath);
+
+ Ydb::Scheme::ListDirectoryResult listDirectoryResult = ListDirectory(channel, path);
+ UNIT_ASSERT_VALUES_EQUAL(listDirectoryResult.self().name(), "mydb");
+ UNIT_ASSERT_VALUES_EQUAL(listDirectoryResult.children(0).name(), "mytable");
+
+ UNIT_ASSERT_VALUES_EQUAL(1, DescribeVolume(channel, tablePath).partition_count());
+
+ AlterVolume(channel, tablePath, 2);
+ listDirectoryResult = ListDirectory(channel, path);
+ UNIT_ASSERT_VALUES_EQUAL(listDirectoryResult.self().name(), "mydb");
+ UNIT_ASSERT_VALUES_EQUAL(listDirectoryResult.children(0).name(), "mytable");
+
+
+ UNIT_ASSERT_VALUES_EQUAL(2, DescribeVolume(channel, tablePath).partition_count());
+
+ DropVolume(channel, tablePath);
+ listDirectoryResult = ListDirectory(channel, path);
+ UNIT_ASSERT_VALUES_EQUAL(listDirectoryResult.self().name(), "mydb");
+ UNIT_ASSERT_VALUES_EQUAL(listDirectoryResult.children_size(), 0);
+ }
+
+ Y_UNIT_TEST(SimpleListPartitions) {
+ return; // delete it after adding ydb_token to requests in tests
+ TKikimrWithGrpcAndRootSchema server;
+ ui16 grpc = server.GetPort();
+ TString location = TStringBuilder() << "localhost:" << grpc;
+
+ std::shared_ptr<grpc::Channel> channel;
+ channel = grpc::CreateChannel("localhost:" + ToString(grpc), grpc::InsecureChannelCredentials());
+
+ TString path = "/Root/mydb/";
+ TString tablePath = "/Root/mydb/mytable";
+ MakeDirectory(channel, path);
+ MakeTable(channel, tablePath);
+
+ std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> stub;
+ stub = Ydb::KeyValue::V1::KeyValueService::NewStub(channel);
+
+ Write(tablePath, 0, "key1", "value1", 1, stub);
+
+ Ydb::KeyValue::ListLocalPartitionsRequest enumerateRequest;
+ enumerateRequest.set_path(tablePath);
+ enumerateRequest.set_node_id(2);
+
+ Ydb::KeyValue::ListLocalPartitionsResult enumerateResult;
+ Ydb::KeyValue::ListLocalPartitionsResponse eumerateResponse;
+
+ grpc::ClientContext enumerateCtx;
+ AdjustCtxForDB(enumerateCtx);
+ stub->ListLocalPartitions(&enumerateCtx, enumerateRequest, &eumerateResponse);
+
+ UNIT_ASSERT_CHECK_STATUS(eumerateResponse.operation(), Ydb::StatusIds::SUCCESS);
+ eumerateResponse.operation().result().UnpackTo(&enumerateResult);
+ UNIT_ASSERT_VALUES_EQUAL(enumerateResult.partition_ids_size(), 1);
+
+ auto writeRes = Write(tablePath, enumerateResult.partition_ids(0), "key2", "value2", 1, stub);
+ UNIT_ASSERT_VALUES_EQUAL(writeRes.node_id(), 2);
+ }
+
+} // Y_UNIT_TEST_SUITE(KeyValueGRPCService)
+
+} // NKikimr::NGRpcService
diff --git a/ydb/services/keyvalue/ut/CMakeLists.darwin-x86_64.txt b/ydb/services/keyvalue/ut/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..2fbf1f4e2d
--- /dev/null
+++ b/ydb/services/keyvalue/ut/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,80 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-services-keyvalue-ut)
+target_compile_options(ydb-services-keyvalue-ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-services-keyvalue-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/keyvalue
+)
+target_link_libraries(ydb-services-keyvalue-ut PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ ydb-services-keyvalue
+ library-cpp-logger
+ ydb-core-protos
+ core-testlib-default
+)
+target_link_options(ydb-services-keyvalue-ut PRIVATE
+ -Wl,-platform_version,macos,11.0,11.0
+ -fPIC
+ -fPIC
+ -framework
+ CoreFoundation
+)
+target_sources(ydb-services-keyvalue-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/keyvalue/grpc_service_ut.cpp
+)
+set_property(
+ TARGET
+ ydb-services-keyvalue-ut
+ PROPERTY
+ SPLIT_FACTOR
+ 1
+)
+add_yunittest(
+ NAME
+ ydb-services-keyvalue-ut
+ TEST_TARGET
+ ydb-services-keyvalue-ut
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-services-keyvalue-ut
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-services-keyvalue-ut
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-services-keyvalue-ut
+ PROPERTY
+ TIMEOUT
+ 100
+)
+target_allocator(ydb-services-keyvalue-ut
+ system_allocator
+)
+vcs_info(ydb-services-keyvalue-ut)
diff --git a/ydb/services/keyvalue/ut/CMakeLists.linux-aarch64.txt b/ydb/services/keyvalue/ut/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..0d0ad06718
--- /dev/null
+++ b/ydb/services/keyvalue/ut/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,83 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-services-keyvalue-ut)
+target_compile_options(ydb-services-keyvalue-ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-services-keyvalue-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/keyvalue
+)
+target_link_libraries(ydb-services-keyvalue-ut PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-testing-unittest_main
+ ydb-services-keyvalue
+ library-cpp-logger
+ ydb-core-protos
+ core-testlib-default
+)
+target_link_options(ydb-services-keyvalue-ut PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(ydb-services-keyvalue-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/keyvalue/grpc_service_ut.cpp
+)
+set_property(
+ TARGET
+ ydb-services-keyvalue-ut
+ PROPERTY
+ SPLIT_FACTOR
+ 1
+)
+add_yunittest(
+ NAME
+ ydb-services-keyvalue-ut
+ TEST_TARGET
+ ydb-services-keyvalue-ut
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-services-keyvalue-ut
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-services-keyvalue-ut
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-services-keyvalue-ut
+ PROPERTY
+ TIMEOUT
+ 100
+)
+target_allocator(ydb-services-keyvalue-ut
+ cpp-malloc-jemalloc
+)
+vcs_info(ydb-services-keyvalue-ut)
diff --git a/ydb/services/keyvalue/ut/CMakeLists.linux-x86_64.txt b/ydb/services/keyvalue/ut/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..baa6125f02
--- /dev/null
+++ b/ydb/services/keyvalue/ut/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,85 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-services-keyvalue-ut)
+target_compile_options(ydb-services-keyvalue-ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-services-keyvalue-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/keyvalue
+)
+target_link_libraries(ydb-services-keyvalue-ut PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ ydb-services-keyvalue
+ library-cpp-logger
+ ydb-core-protos
+ core-testlib-default
+)
+target_link_options(ydb-services-keyvalue-ut PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(ydb-services-keyvalue-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/keyvalue/grpc_service_ut.cpp
+)
+set_property(
+ TARGET
+ ydb-services-keyvalue-ut
+ PROPERTY
+ SPLIT_FACTOR
+ 1
+)
+add_yunittest(
+ NAME
+ ydb-services-keyvalue-ut
+ TEST_TARGET
+ ydb-services-keyvalue-ut
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-services-keyvalue-ut
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-services-keyvalue-ut
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-services-keyvalue-ut
+ PROPERTY
+ TIMEOUT
+ 100
+)
+target_allocator(ydb-services-keyvalue-ut
+ cpp-malloc-tcmalloc
+ libs-tcmalloc-no_percpu_cache
+)
+vcs_info(ydb-services-keyvalue-ut)
diff --git a/ydb/services/keyvalue/ut/CMakeLists.txt b/ydb/services/keyvalue/ut/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/services/keyvalue/ut/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/services/keyvalue/ut/CMakeLists.windows-x86_64.txt b/ydb/services/keyvalue/ut/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..6bfcb33387
--- /dev/null
+++ b/ydb/services/keyvalue/ut/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,73 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-services-keyvalue-ut)
+target_compile_options(ydb-services-keyvalue-ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-services-keyvalue-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/keyvalue
+)
+target_link_libraries(ydb-services-keyvalue-ut PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ ydb-services-keyvalue
+ library-cpp-logger
+ ydb-core-protos
+ core-testlib-default
+)
+target_sources(ydb-services-keyvalue-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/keyvalue/grpc_service_ut.cpp
+)
+set_property(
+ TARGET
+ ydb-services-keyvalue-ut
+ PROPERTY
+ SPLIT_FACTOR
+ 1
+)
+add_yunittest(
+ NAME
+ ydb-services-keyvalue-ut
+ TEST_TARGET
+ ydb-services-keyvalue-ut
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-services-keyvalue-ut
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-services-keyvalue-ut
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-services-keyvalue-ut
+ PROPERTY
+ TIMEOUT
+ 100
+)
+target_allocator(ydb-services-keyvalue-ut
+ system_allocator
+)
+vcs_info(ydb-services-keyvalue-ut)
diff --git a/ydb/services/keyvalue/ut/ya.make b/ydb/services/keyvalue/ut/ya.make
new file mode 100644
index 0000000000..22227dc72a
--- /dev/null
+++ b/ydb/services/keyvalue/ut/ya.make
@@ -0,0 +1,19 @@
+UNITTEST_FOR(ydb/services/keyvalue)
+
+SIZE(MEDIUM)
+
+TIMEOUT(100)
+
+SRCS(
+ grpc_service_ut.cpp
+)
+
+PEERDIR(
+ library/cpp/logger
+ ydb/core/protos
+ ydb/core/testlib/default
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/services/keyvalue/ya.make b/ydb/services/keyvalue/ya.make
new file mode 100644
index 0000000000..7250b9aef4
--- /dev/null
+++ b/ydb/services/keyvalue/ya.make
@@ -0,0 +1,20 @@
+LIBRARY()
+
+SRCS(
+ grpc_service.cpp
+)
+
+PEERDIR(
+ ydb/public/api/grpc
+ library/cpp/grpc/server
+ ydb/core/grpc_services
+ ydb/core/grpc_services/base
+ ydb/core/kesus/tablet
+ ydb/core/keyvalue
+)
+
+END()
+
+RECURSE_FOR_TESTS(
+ ut
+)
diff --git a/ydb/services/ya.make b/ydb/services/ya.make
index a89961e2ba..da97baa73c 100644
--- a/ydb/services/ya.make
+++ b/ydb/services/ya.make
@@ -7,6 +7,7 @@ RECURSE(
discovery
fq
kesus
+ keyvalue
lib
local_discovery
maintenance