diff options
author | kruall <kruall@ydb.tech> | 2023-06-21 15:45:15 +0300 |
---|---|---|
committer | kruall <kruall@ydb.tech> | 2023-06-21 15:45:15 +0300 |
commit | 639e4f87a1eab243840614a867086f32356ffde2 (patch) | |
tree | 4dd3b7f845d389a45498bffc4cf354711648f8d4 | |
parent | 21d8ea644e826af29c32c267c542a69cc139effb (diff) | |
download | ydb-639e4f87a1eab243840614a867086f32356ffde2.tar.gz |
Move kv api logic to oss,
24 files changed, 2526 insertions, 0 deletions
diff --git a/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt b/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt index 3bb4353c63..518f9b5671 100644 --- a/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt @@ -109,6 +109,7 @@ target_sources(ydb-core-grpc_services PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_import.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_import_data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_keep_alive.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_keyvalue.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_kh_describe.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_kh_snapshots.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_kqp_base.cpp diff --git a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt index aa9c33bfe2..2316cc559b 100644 --- a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt +++ b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt @@ -110,6 +110,7 @@ target_sources(ydb-core-grpc_services PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_import.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_import_data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_keep_alive.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_keyvalue.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_kh_describe.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_kh_snapshots.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_kqp_base.cpp diff --git a/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt b/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt index aa9c33bfe2..2316cc559b 100644 --- a/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt +++ b/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt @@ -110,6 +110,7 @@ target_sources(ydb-core-grpc_services PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_import.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_import_data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_keep_alive.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_keyvalue.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_kh_describe.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_kh_snapshots.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_kqp_base.cpp diff --git a/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt b/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt index 3bb4353c63..518f9b5671 100644 --- a/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt +++ b/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt @@ -109,6 +109,7 @@ target_sources(ydb-core-grpc_services PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_import.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_import_data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_keep_alive.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_keyvalue.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_kh_describe.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_kh_snapshots.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_kqp_base.cpp diff --git a/ydb/core/grpc_services/rpc_keyvalue.cpp b/ydb/core/grpc_services/rpc_keyvalue.cpp new file mode 100644 index 0000000000..aea0d568c9 --- /dev/null +++ b/ydb/core/grpc_services/rpc_keyvalue.cpp @@ -0,0 +1,1048 @@ +#include "service_keyvalue.h" + +#include <ydb/public/api/protos/ydb_keyvalue.pb.h> + +#include <ydb/core/base/path.h> +#include <ydb/core/grpc_services/rpc_scheme_base.h> +#include <ydb/core/grpc_services/rpc_common.h> +#include <ydb/core/keyvalue/keyvalue_events.h> +#include <ydb/core/tx/scheme_cache/scheme_cache.h> +#include <ydb/core/mind/local.h> +#include <ydb/core/protos/local.pb.h> + + +namespace NKikimr::NGRpcService { + +using namespace NActors; +using namespace Ydb; + +using TEvCreateVolumeKeyValueRequest = + TGrpcRequestOperationCall<Ydb::KeyValue::CreateVolumeRequest, + Ydb::KeyValue::CreateVolumeResponse>; +using TEvDropVolumeKeyValueRequest = + TGrpcRequestOperationCall<Ydb::KeyValue::DropVolumeRequest, + Ydb::KeyValue::DropVolumeResponse>; +using TEvAlterVolumeKeyValueRequest = + TGrpcRequestOperationCall<Ydb::KeyValue::AlterVolumeRequest, + Ydb::KeyValue::AlterVolumeResponse>; +using TEvDescribeVolumeKeyValueRequest = + TGrpcRequestOperationCall<Ydb::KeyValue::DescribeVolumeRequest, + Ydb::KeyValue::DescribeVolumeResponse>; +using TEvListLocalPartitionsKeyValueRequest = + TGrpcRequestOperationCall<Ydb::KeyValue::ListLocalPartitionsRequest, + Ydb::KeyValue::ListLocalPartitionsResponse>; + +using TEvAcquireLockKeyValueRequest = + TGrpcRequestOperationCall<Ydb::KeyValue::AcquireLockRequest, + Ydb::KeyValue::AcquireLockResponse>; +using TEvExecuteTransactionKeyValueRequest = + TGrpcRequestOperationCall<Ydb::KeyValue::ExecuteTransactionRequest, + Ydb::KeyValue::ExecuteTransactionResponse>; +using TEvReadKeyValueRequest = + TGrpcRequestOperationCall<Ydb::KeyValue::ReadRequest, + Ydb::KeyValue::ReadResponse>; +using TEvReadRangeKeyValueRequest = + TGrpcRequestOperationCall<Ydb::KeyValue::ReadRangeRequest, + Ydb::KeyValue::ReadRangeResponse>; +using TEvListRangeKeyValueRequest = + TGrpcRequestOperationCall<Ydb::KeyValue::ListRangeRequest, + Ydb::KeyValue::ListRangeResponse>; +using TEvGetStorageChannelStatusKeyValueRequest = + TGrpcRequestOperationCall<Ydb::KeyValue::GetStorageChannelStatusRequest, + Ydb::KeyValue::GetStorageChannelStatusResponse>; + +} // namespace NKikimr::NGRpcService + + +namespace NKikimr::NGRpcService { + +using namespace NActors; +using namespace Ydb; + +#define COPY_PRIMITIVE_FIELD(name) \ + to->set_ ## name(static_cast<decltype(to->name())>(from.name())) \ +// COPY_PRIMITIVE_FIELD + +#define COPY_PRIMITIVE_OPTIONAL_FIELD(name) \ + if (from.has_ ## name()) { \ + to->set_ ## name(static_cast<decltype(to->name())>(from.name())); \ + } \ +// COPY_PRIMITIVE_FIELD + +namespace { + +void CopyProtobuf(const Ydb::KeyValue::AcquireLockRequest &/*from*/, + NKikimrKeyValue::AcquireLockRequest */*to*/) +{ +} + +void CopyProtobuf(const NKikimrKeyValue::AcquireLockResult &from, + Ydb::KeyValue::AcquireLockResult *to) +{ + COPY_PRIMITIVE_FIELD(lock_generation); + COPY_PRIMITIVE_FIELD(node_id); +} + + +void CopyProtobuf(const Ydb::KeyValue::ExecuteTransactionRequest::Command::Rename &from, + NKikimrKeyValue::ExecuteTransactionRequest::Command::Rename *to) +{ + COPY_PRIMITIVE_FIELD(old_key); + COPY_PRIMITIVE_FIELD(new_key); +} + +void CopyProtobuf(const Ydb::KeyValue::ExecuteTransactionRequest::Command::Concat &from, + NKikimrKeyValue::ExecuteTransactionRequest::Command::Concat *to) +{ + *to->mutable_input_keys() = from.input_keys(); + COPY_PRIMITIVE_FIELD(output_key); + COPY_PRIMITIVE_FIELD(keep_inputs); +} + +void CopyProtobuf(const Ydb::KeyValue::KeyRange &from, NKikimrKeyValue::KVRange *to) { +#define CHECK_AND_SET(name) \ + if (from.has_ ## name()) { \ + COPY_PRIMITIVE_FIELD(name); \ + } \ +// CHECK_AND_SET + + CHECK_AND_SET(from_key_inclusive) + CHECK_AND_SET(from_key_exclusive) + CHECK_AND_SET(to_key_inclusive) + CHECK_AND_SET(to_key_exclusive) + +#undef CHECK_AND_SET +} + +void CopyProtobuf(const Ydb::KeyValue::ExecuteTransactionRequest::Command::CopyRange &from, + NKikimrKeyValue::ExecuteTransactionRequest::Command::CopyRange *to) +{ + CopyProtobuf(from.range(), to->mutable_range()); + COPY_PRIMITIVE_FIELD(prefix_to_remove); + COPY_PRIMITIVE_FIELD(prefix_to_add); +} + +template <typename TProtoFrom, typename TProtoTo> +void CopyPriority(TProtoFrom &&from, TProtoTo *to) { + switch(from.priority()) { + case Ydb::KeyValue::Priorities::PRIORITY_REALTIME: + to->set_priority(NKikimrKeyValue::Priorities::PRIORITY_REALTIME); + break; + case Ydb::KeyValue::Priorities::PRIORITY_BACKGROUND: + to->set_priority(NKikimrKeyValue::Priorities::PRIORITY_BACKGROUND); + break; + default: + to->set_priority(NKikimrKeyValue::Priorities::PRIORITY_UNSPECIFIED); + break; + } +} + +void CopyProtobuf(const Ydb::KeyValue::ExecuteTransactionRequest::Command::Write &from, + NKikimrKeyValue::ExecuteTransactionRequest::Command::Write *to) +{ + COPY_PRIMITIVE_FIELD(key); + COPY_PRIMITIVE_FIELD(value); + COPY_PRIMITIVE_FIELD(storage_channel); + CopyPriority(from, to); + switch(from.tactic()) { + case Ydb::KeyValue::ExecuteTransactionRequest::Command::Write::TACTIC_MAX_THROUGHPUT: + to->set_tactic(NKikimrKeyValue::ExecuteTransactionRequest::Command::Write::TACTIC_MAX_THROUGHPUT); + break; + case Ydb::KeyValue::ExecuteTransactionRequest::Command::Write::TACTIC_MIN_LATENCY: + to->set_tactic(NKikimrKeyValue::ExecuteTransactionRequest::Command::Write::TACTIC_MIN_LATENCY); + break; + default: + to->set_tactic(NKikimrKeyValue::ExecuteTransactionRequest::Command::Write::TACTIC_UNSPECIFIED); + break; + } +} + +void CopyProtobuf(const Ydb::KeyValue::ExecuteTransactionRequest::Command::DeleteRange &from, + NKikimrKeyValue::ExecuteTransactionRequest::Command::DeleteRange *to) +{ + CopyProtobuf(from.range(), to->mutable_range()); +} + +void CopyProtobuf(const Ydb::KeyValue::ExecuteTransactionRequest::Command &from, + NKikimrKeyValue::ExecuteTransactionRequest::Command *to) +{ +#define CHECK_AND_COPY(name) \ + if (from.has_ ## name()) { \ + CopyProtobuf(from.name(), to->mutable_ ## name()); \ + } \ +// CHECK_AND_COPY + + CHECK_AND_COPY(rename) + CHECK_AND_COPY(concat) + CHECK_AND_COPY(copy_range) + CHECK_AND_COPY(write) + CHECK_AND_COPY(delete_range) + +#undef CHECK_AND_COPY +} + +void CopyProtobuf(const Ydb::KeyValue::ExecuteTransactionRequest &from, + NKikimrKeyValue::ExecuteTransactionRequest *to) +{ + COPY_PRIMITIVE_OPTIONAL_FIELD(lock_generation); + for (auto &cmd : from.commands()) { + CopyProtobuf(cmd, to->add_commands()); + } +} + +void CopyProtobuf(const NKikimrKeyValue::StorageChannel &from, Ydb::KeyValue::StorageChannelInfo *to) { + COPY_PRIMITIVE_FIELD(storage_channel); + COPY_PRIMITIVE_FIELD(status_flag); +} + +void CopyProtobuf(const NKikimrKeyValue::ExecuteTransactionResult &from, + Ydb::KeyValue::ExecuteTransactionResult *to) +{ + COPY_PRIMITIVE_FIELD(node_id); + for (auto &channel : from.storage_channel()) { + CopyProtobuf(channel, to->add_storage_channel_info()); + } +} + +void CopyProtobuf(const Ydb::KeyValue::ReadRequest &from, NKikimrKeyValue::ReadRequest *to) { + COPY_PRIMITIVE_OPTIONAL_FIELD(lock_generation); + COPY_PRIMITIVE_FIELD(key); + COPY_PRIMITIVE_FIELD(offset); + COPY_PRIMITIVE_FIELD(size); + CopyPriority(from, to); + COPY_PRIMITIVE_FIELD(limit_bytes); +} + +void CopyProtobuf(const NKikimrKeyValue::ReadResult &from, Ydb::KeyValue::ReadResult *to) { + COPY_PRIMITIVE_FIELD(requested_key); + COPY_PRIMITIVE_FIELD(requested_offset); + COPY_PRIMITIVE_FIELD(requested_size); + COPY_PRIMITIVE_FIELD(value); + COPY_PRIMITIVE_FIELD(node_id); + switch (from.status()) { + case NKikimrKeyValue::Statuses::RSTATUS_OVERRUN: + to->set_is_overrun(true); + break; + default: + break; + } +} + +void CopyProtobuf(const Ydb::KeyValue::ReadRangeRequest &from, NKikimrKeyValue::ReadRangeRequest *to) { + COPY_PRIMITIVE_OPTIONAL_FIELD(lock_generation); + CopyProtobuf(from.range(), to->mutable_range()); + to->set_include_data(true); + COPY_PRIMITIVE_FIELD(limit_bytes); + CopyPriority(from, to); +} + +void CopyProtobuf(const Ydb::KeyValue::ListRangeRequest &from, NKikimrKeyValue::ReadRangeRequest *to) { + COPY_PRIMITIVE_OPTIONAL_FIELD(lock_generation); + CopyProtobuf(from.range(), to->mutable_range()); + to->set_include_data(false); + COPY_PRIMITIVE_FIELD(limit_bytes); +} + +void CopyProtobuf(const NKikimrKeyValue::ReadRangeResult::KeyValuePair &from, + Ydb::KeyValue::ReadRangeResult::KeyValuePair *to) +{ + COPY_PRIMITIVE_FIELD(key); + COPY_PRIMITIVE_FIELD(value); + COPY_PRIMITIVE_FIELD(creation_unix_time); + COPY_PRIMITIVE_FIELD(storage_channel); +} + +void CopyProtobuf(const NKikimrKeyValue::ReadRangeResult &from, + Ydb::KeyValue::ReadRangeResult *to) +{ + for (auto &pair : from.pair()) { + CopyProtobuf(pair, to->add_pair()); + } + if (from.status() == NKikimrKeyValue::Statuses::RSTATUS_OVERRUN) { + to->set_is_overrun(true); + } + COPY_PRIMITIVE_FIELD(node_id); +} + +void CopyProtobuf(const NKikimrKeyValue::ReadRangeResult::KeyValuePair &from, + Ydb::KeyValue::ListRangeResult::KeyInfo *to) +{ + COPY_PRIMITIVE_FIELD(key); + COPY_PRIMITIVE_FIELD(value_size); + COPY_PRIMITIVE_FIELD(creation_unix_time); + COPY_PRIMITIVE_FIELD(storage_channel); +} + +void CopyProtobuf(const NKikimrKeyValue::ReadRangeResult &from, + Ydb::KeyValue::ListRangeResult *to) +{ + for (auto &pair : from.pair()) { + CopyProtobuf(pair, to->add_key()); + } + if (from.status() == NKikimrKeyValue::Statuses::RSTATUS_OVERRUN) { + to->set_is_overrun(true); + } + COPY_PRIMITIVE_FIELD(node_id); +} + +void CopyProtobuf(const Ydb::KeyValue::GetStorageChannelStatusRequest &from, + NKikimrKeyValue::GetStorageChannelStatusRequest *to) +{ + COPY_PRIMITIVE_OPTIONAL_FIELD(lock_generation); + *to->mutable_storage_channel() = from.storage_channel(); +} + + +void CopyProtobuf(const NKikimrKeyValue::GetStorageChannelStatusResult &from, + Ydb::KeyValue::GetStorageChannelStatusResult *to) +{ + for (auto &channel : from.storage_channel()) { + CopyProtobuf(channel, to->add_storage_channel_info()); + } + COPY_PRIMITIVE_FIELD(node_id); +} + + +Ydb::StatusIds::StatusCode PullStatus(const NKikimrKeyValue::AcquireLockResult &) { + return Ydb::StatusIds::SUCCESS; +} + +template <typename TResult> +Ydb::StatusIds::StatusCode PullStatus(const TResult &result) { + switch (result.status()) { + case NKikimrKeyValue::Statuses::RSTATUS_OK: + case NKikimrKeyValue::Statuses::RSTATUS_OVERRUN: + return Ydb::StatusIds::SUCCESS; + case NKikimrKeyValue::Statuses::RSTATUS_ERROR: + return Ydb::StatusIds::GENERIC_ERROR; + case NKikimrKeyValue::Statuses::RSTATUS_TIMEOUT: + return Ydb::StatusIds::TIMEOUT; + case NKikimrKeyValue::Statuses::RSTATUS_NOT_FOUND: + return Ydb::StatusIds::NOT_FOUND; + case NKikimrKeyValue::Statuses::RSTATUS_WRONG_LOCK_GENERATION: + return Ydb::StatusIds::PRECONDITION_FAILED; + default: + return Ydb::StatusIds::INTERNAL_ERROR; + } +} + +namespace { + void AssignPoolKinds(auto &storageConfig, auto *internalStorageConfig) { + ui32 size = storageConfig.channel_size(); + + for (ui32 channelIdx = 0; channelIdx < size; ++channelIdx) { + internalStorageConfig->AddChannel()->SetPreferredPoolKind(storageConfig.channel(channelIdx).media()); + } + } +} + + +class TCreateVolumeRequest : public TRpcSchemeRequestActor<TCreateVolumeRequest, TEvCreateVolumeKeyValueRequest> { +public: + using TBase = TRpcSchemeRequestActor<TCreateVolumeRequest, TEvCreateVolumeKeyValueRequest>; + using TBase::TBase; + + void Bootstrap(const TActorContext& ctx) { + TBase::Bootstrap(ctx); + Become(&TCreateVolumeRequest::StateFunc); + SendProposeRequest(ctx); + } + + void SendProposeRequest(const TActorContext &ctx) { + const auto req = this->GetProtoRequest(); + + std::pair<TString, TString> pathPair; + try { + pathPair = SplitPath(Request_->GetDatabaseName(), req->path()); + } catch (const std::exception& ex) { + Request_->RaiseIssue(NYql::ExceptionToIssue(ex)); + return Reply(StatusIds::BAD_REQUEST, ctx); + } + const auto& workingDir = pathPair.first; + const auto& name = pathPair.second; + + std::unique_ptr<TEvTxUserProxy::TEvProposeTransaction> proposeRequest = this->CreateProposeTransaction(); + NKikimrTxUserProxy::TEvProposeTransaction& record = proposeRequest->Record; + NKikimrSchemeOp::TModifyScheme* modifyScheme = record.MutableTransaction()->MutableModifyScheme(); + modifyScheme->SetWorkingDir(workingDir); + NKikimrSchemeOp::TCreateSolomonVolume* tableDesc = nullptr; + + modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpCreateSolomonVolume); + tableDesc = modifyScheme->MutableCreateSolomonVolume(); + tableDesc->SetName(name); + tableDesc->SetPartitionCount(req->partition_count()); + + if (GetProtoRequest()->has_storage_config()) { + auto &storageConfig = GetProtoRequest()->storage_config(); + auto *internalStorageConfig = tableDesc->MutableStorageConfig(); + AssignPoolKinds(storageConfig, internalStorageConfig); + } else { + tableDesc->SetChannelProfileId(GetProtoRequest()->partition_count()); + } + + ctx.Send(MakeTxProxyID(), proposeRequest.release()); + } + + STFUNC(StateFunc) { + return TBase::StateWork(ev); + } +}; + + +class TDropVolumeRequest : public TRpcSchemeRequestActor<TDropVolumeRequest, TEvDropVolumeKeyValueRequest> { +public: + using TBase = TRpcSchemeRequestActor<TDropVolumeRequest, TEvDropVolumeKeyValueRequest>; + using TBase::TBase; + + void Bootstrap(const TActorContext& ctx) { + TBase::Bootstrap(ctx); + Become(&TDropVolumeRequest::StateFunc); + SendProposeRequest(ctx); + } + + void SendProposeRequest(const TActorContext &ctx) { + const auto req = this->GetProtoRequest(); + + std::pair<TString, TString> pathPair; + try { + pathPair = SplitPath(req->path()); + } catch (const std::exception& ex) { + Request_->RaiseIssue(NYql::ExceptionToIssue(ex)); + return Reply(StatusIds::BAD_REQUEST, ctx); + } + const auto& workingDir = pathPair.first; + const auto& name = pathPair.second; + + std::unique_ptr<TEvTxUserProxy::TEvProposeTransaction> proposeRequest = this->CreateProposeTransaction(); + NKikimrTxUserProxy::TEvProposeTransaction& record = proposeRequest->Record; + NKikimrSchemeOp::TModifyScheme* modifyScheme = record.MutableTransaction()->MutableModifyScheme(); + modifyScheme->SetWorkingDir(workingDir); + NKikimrSchemeOp::TDrop* drop = nullptr; + + modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpDropSolomonVolume); + drop = modifyScheme->MutableDrop(); + drop->SetName(name); + + ctx.Send(MakeTxProxyID(), proposeRequest.release()); + } + + STFUNC(StateFunc) { + return TBase::StateWork(ev); + } +}; + +class TAlterVolumeRequest : public TRpcSchemeRequestActor<TAlterVolumeRequest, TEvAlterVolumeKeyValueRequest> { +public: + using TBase = TRpcSchemeRequestActor<TAlterVolumeRequest, TEvAlterVolumeKeyValueRequest>; + using TBase::TBase; + + void Bootstrap(const TActorContext& ctx) { + TBase::Bootstrap(ctx); + Become(&TAlterVolumeRequest::StateFunc); + SendProposeRequest(ctx); + } + + void SendProposeRequest(const TActorContext &ctx) { + const auto req = this->GetProtoRequest(); + + std::pair<TString, TString> pathPair; + try { + pathPair = SplitPath(req->path()); + } catch (const std::exception& ex) { + Request_->RaiseIssue(NYql::ExceptionToIssue(ex)); + return Reply(StatusIds::BAD_REQUEST, ctx); + } + const auto& workingDir = pathPair.first; + const auto& name = pathPair.second; + + std::unique_ptr<TEvTxUserProxy::TEvProposeTransaction> proposeRequest = this->CreateProposeTransaction(); + NKikimrTxUserProxy::TEvProposeTransaction& record = proposeRequest->Record; + NKikimrSchemeOp::TModifyScheme* modifyScheme = record.MutableTransaction()->MutableModifyScheme(); + modifyScheme->SetWorkingDir(workingDir); + NKikimrSchemeOp::TAlterSolomonVolume* tableDesc = nullptr; + + modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterSolomonVolume); + tableDesc = modifyScheme->MutableAlterSolomonVolume(); + tableDesc->SetName(name); + tableDesc->SetPartitionCount(req->alter_partition_count()); + + if (GetProtoRequest()->has_storage_config()) { + tableDesc->SetUpdateChannelsBinding(true); + auto &storageConfig = GetProtoRequest()->storage_config(); + auto *internalStorageConfig = tableDesc->MutableStorageConfig(); + AssignPoolKinds(storageConfig, internalStorageConfig); + } else { + tableDesc->SetUpdateChannelsBinding(false); + tableDesc->SetChannelProfileId(0); + } + + ctx.Send(MakeTxProxyID(), proposeRequest.release()); + } + + STFUNC(StateFunc) { + return TBase::StateWork(ev); + } +}; + +template <typename TDerived> +class TBaseKeyValueRequest { +protected: + void OnBootstrap() { + auto self = static_cast<TDerived*>(this); + Ydb::StatusIds::StatusCode status = Ydb::StatusIds::STATUS_CODE_UNSPECIFIED; + NYql::TIssues issues; + if (!self->ValidateRequest(status, issues)) { + self->Reply(status, issues, self->ActorContext()); + return; + } + if (const auto& userToken = self->Request_->GetSerializedToken()) { + UserToken = new NACLib::TUserToken(userToken); + } + SendNavigateRequest(); + } + + void SendNavigateRequest() { + auto self = static_cast<TDerived*>(this); + auto &rec = *self->GetProtoRequest(); + auto req = MakeHolder<NSchemeCache::TSchemeCacheNavigate>(); + auto& entry = req->ResultSet.emplace_back(); + entry.Path = ::NKikimr::SplitPath(rec.path()); + entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByPath; + entry.ShowPrivatePath = true; + entry.SyncVersion = false; + req->UserToken = UserToken; + req->DatabaseName = self->Request_->GetDatabaseName().GetOrElse(""); + auto ev = new TEvTxProxySchemeCache::TEvNavigateKeySet(req.Release()); + self->Send(MakeSchemeCacheID(), ev); + } + + bool OnNavigateKeySetResult(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr &ev, ui32 access) { + auto self = static_cast<TDerived*>(this); + TEvTxProxySchemeCache::TEvNavigateKeySetResult* res = ev->Get(); + NSchemeCache::TSchemeCacheNavigate *request = res->Request.Get(); + + auto ctx = self->ActorContext(); + + if (res->Request->ResultSet.size() != 1) { + self->Reply(StatusIds::INTERNAL_ERROR, "Received an incorrect answer from SchemeCache.", NKikimrIssues::TIssuesIds::UNEXPECTED, ctx); + return false; + } + + switch (request->ResultSet[0].Status) { + case NSchemeCache::TSchemeCacheNavigate::EStatus::Ok: + break; + case NSchemeCache::TSchemeCacheNavigate::EStatus::RootUnknown: + case NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown: + self->Reply(StatusIds::SCHEME_ERROR, "Path isn't exist.", NKikimrIssues::TIssuesIds::PATH_NOT_EXIST, ctx); + return false; + case NSchemeCache::TSchemeCacheNavigate::EStatus::LookupError: + case NSchemeCache::TSchemeCacheNavigate::EStatus::RedirectLookupError: + self->Reply(StatusIds::UNAVAILABLE, "Database resolve failed with no certain result.", NKikimrIssues::TIssuesIds::RESOLVE_LOOKUP_ERROR, ctx); + return false; + default: + self->Reply(StatusIds::UNAVAILABLE, "Resolve error", NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, ctx); + return false; + } + + if (!self->CheckAccess(CanonizePath(res->Request->ResultSet[0].Path), res->Request->ResultSet[0].SecurityObject, access)) { + return false; + } + if (!request->ResultSet[0].SolomonVolumeInfo) { + self->Reply(StatusIds::SCHEME_ERROR, "Table isn't keyvalue.", NKikimrIssues::TIssuesIds::DEFAULT_ERROR, ctx); + return false; + } + + return true; + } + + bool CheckAccess(const TString& path, TIntrusivePtr<TSecurityObject> securityObject, ui32 access) { + auto self = static_cast<TDerived*>(this); + if (!UserToken || !securityObject) { + return true; + } + + if (securityObject->CheckAccess(access, *UserToken)) { + return true; + } + + self->Reply(Ydb::StatusIds::UNAUTHORIZED, + TStringBuilder() << "Access denied" + << ": for# " << UserToken->GetUserSID() + << ", path# " << path + << ", access# " << NACLib::AccessRightsToString(access), + NKikimrIssues::TIssuesIds::ACCESS_DENIED, + self->ActorContext()); + return false; + } + +private: + TIntrusiveConstPtr<NACLib::TUserToken> UserToken; +}; + +class TDescribeVolumeRequest + : public TRpcOperationRequestActor<TDescribeVolumeRequest, TEvDescribeVolumeKeyValueRequest> + , public TBaseKeyValueRequest<TDescribeVolumeRequest> +{ +public: + using TBase = TRpcOperationRequestActor<TDescribeVolumeRequest, TEvDescribeVolumeKeyValueRequest>; + using TBase::TBase; + + friend class TBaseKeyValueRequest<TDescribeVolumeRequest>; + + void Bootstrap(const TActorContext& ctx) { + TBase::Bootstrap(ctx); + OnBootstrap(); + Become(&TDescribeVolumeRequest::StateFunc); + } + + +protected: + STFUNC(StateFunc) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); + default: + return TBase::StateFuncBase(ev); + } + } + + void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr &ev) { + TEvTxProxySchemeCache::TEvNavigateKeySetResult* res = ev->Get(); + NSchemeCache::TSchemeCacheNavigate *request = res->Request.Get(); + + if (!OnNavigateKeySetResult(ev, NACLib::DescribeSchema)) { + return; + } + + const NKikimrSchemeOp::TSolomonVolumeDescription &desc = request->ResultSet[0].SolomonVolumeInfo->Description; + Ydb::KeyValue::DescribeVolumeResult result; + result.set_path(this->GetProtoRequest()->path()); + result.set_partition_count(desc.PartitionsSize()); + this->ReplyWithResult(Ydb::StatusIds::SUCCESS, result, TActivationContext::AsActorContext()); + } + + bool ValidateRequest(Ydb::StatusIds::StatusCode& /*status*/, NYql::TIssues& /*issues*/) { + return true; + } + +private: + TIntrusiveConstPtr<NACLib::TUserToken> UserToken; +}; + + +class TListLocalPartitionsRequest + : public TRpcOperationRequestActor<TListLocalPartitionsRequest, TEvListLocalPartitionsKeyValueRequest> + , public TBaseKeyValueRequest<TListLocalPartitionsRequest> +{ +public: + using TBase = TRpcOperationRequestActor<TListLocalPartitionsRequest, TEvListLocalPartitionsKeyValueRequest>; + using TBase::TBase; + + friend class TBaseKeyValueRequest<TListLocalPartitionsRequest>; + + void Bootstrap(const TActorContext& ctx) { + TBase::Bootstrap(ctx); + OnBootstrap(); + Become(&TListLocalPartitionsRequest::StateFunc); + } + +protected: + STFUNC(StateFunc) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); + hFunc(TEvLocal::TEvEnumerateTabletsResult, Handle); + default: + return TBase::StateFuncBase(ev); + } + } + + void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr &ev) { + TEvTxProxySchemeCache::TEvNavigateKeySetResult* res = ev->Get(); + NSchemeCache::TSchemeCacheNavigate *request = res->Request.Get(); + + if (!OnNavigateKeySetResult(ev, NACLib::DescribeSchema)) { + return; + } + + const NKikimrSchemeOp::TSolomonVolumeDescription &desc = request->ResultSet[0].SolomonVolumeInfo->Description; + for (const NKikimrSchemeOp::TSolomonVolumeDescription::TPartition &partition : desc.GetPartitions()) { + TabletIdToPartitionId[partition.GetTabletId()] = partition.GetPartitionId(); + } + + if (TabletIdToPartitionId.empty()) { + Ydb::KeyValue::ListLocalPartitionsResult result; + result.set_path(this->GetProtoRequest()->path()); + result.set_node_id(SelfId().NodeId()); + this->ReplyWithResult(Ydb::StatusIds::SUCCESS, result, TActivationContext::AsActorContext()); + return; + } + + SendRequest(); + } + + TActorId MakeLocalRegistrarID() { + auto &ctx = TActivationContext::AsActorContext(); + auto &domainsInfo = *AppData(ctx)->DomainsInfo; + auto domainIt = domainsInfo.Domains.find(1); + if (domainIt == domainsInfo.Domains.end()) { + TActorId invalidId; + return invalidId; + } + auto &rec = *this->GetProtoRequest(); + ui32 nodeId = rec.node_id() ? rec.node_id() : ctx.SelfID.NodeId(); + ui32 hiveUid = domainsInfo.GetDefaultHiveUid(1); + ui64 hiveId = domainsInfo.GetHive(hiveUid); + return ::NKikimr::MakeLocalRegistrarID(nodeId, hiveId); + } + + TEvLocal::TEvEnumerateTablets* MakeRequest() { + return new TEvLocal::TEvEnumerateTablets(TTabletTypes::KeyValue); + } + + void SendRequest() { + Send(MakeLocalRegistrarID(), MakeRequest(), IEventHandle::FlagTrackDelivery, 0); + } + + void Handle(TEvLocal::TEvEnumerateTabletsResult::TPtr &ev) { + const NKikimrLocal::TEvEnumerateTabletsResult &record = ev->Get()->Record; + if (!record.HasStatus() || record.GetStatus() != NKikimrProto::OK) { + this->Reply(StatusIds::INTERNAL_ERROR, "Received an incorrect answer from Local.", NKikimrIssues::TIssuesIds::UNEXPECTED, this->ActorContext()); + return; + } + + Ydb::KeyValue::ListLocalPartitionsResult result; + result.set_path(this->GetProtoRequest()->path()); + result.set_node_id(SelfId().NodeId()); + for (auto &item : record.GetTabletInfo()) { + if (!item.HasTabletId()) { + continue; + } + auto it = TabletIdToPartitionId.find(item.GetTabletId()); + if (it != TabletIdToPartitionId.end()) { + result.add_partition_ids(it->second); + } + } + this->ReplyWithResult(Ydb::StatusIds::SUCCESS, result, TActivationContext::AsActorContext()); + } + + bool ValidateRequest(Ydb::StatusIds::StatusCode& /*status*/, NYql::TIssues& /*issues*/) { + return true; + } + +private: + THashMap<ui64, ui64> TabletIdToPartitionId; +}; + + +template <typename TDerived, typename TRequest, typename TResultRecord, typename TKVRequest> +class TKeyValueRequestGrpc + : public TRpcOperationRequestActor<TDerived, TRequest> + , public TBaseKeyValueRequest<TKeyValueRequestGrpc<TDerived, TRequest, TResultRecord, TKVRequest>> +{ +public: + using TBase = TRpcOperationRequestActor<TDerived, TRequest>; + using TBase::TBase; + + friend class TBaseKeyValueRequest<TKeyValueRequestGrpc<TDerived, TRequest, TResultRecord, TKVRequest>>; + + void Bootstrap(const TActorContext& ctx) { + TBase::Bootstrap(ctx); + this->OnBootstrap(); + this->Become(&TKeyValueRequestGrpc::StateFunc); + } + + +protected: + STFUNC(StateFunc) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvTabletPipe::TEvClientConnected, Handle); + hFunc(TEvTabletPipe::TEvClientDestroyed, Handle); + hFunc(TKVRequest::TResponse, Handle); + hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); + default: + return TBase::StateFuncBase(ev); + } + } + + void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr &ev) { + TEvTxProxySchemeCache::TEvNavigateKeySetResult* res = ev->Get(); + NSchemeCache::TSchemeCacheNavigate *request = res->Request.Get(); + + if (!this->OnNavigateKeySetResult(ev, static_cast<TDerived*>(this)->GetRequiredAccessRights())) { + return; + } + + auto &rec = *this->GetProtoRequest(); + const NKikimrSchemeOp::TSolomonVolumeDescription &desc = request->ResultSet[0].SolomonVolumeInfo->Description; + + if (rec.partition_id() >= desc.PartitionsSize()) { + this->Reply(StatusIds::SCHEME_ERROR, "The partition wasn't found. Partition ID was larger or equal partition count.", NKikimrIssues::TIssuesIds::DEFAULT_ERROR, this->ActorContext()); + return; + } + + ui64 partitionId = rec.partition_id(); + if (const auto &partition = desc.GetPartitions(rec.partition_id()); partition.GetPartitionId() == partitionId) { + KVTabletId = partition.GetTabletId(); + } else { + Y_VERIFY_DEBUG(false); + for (const NKikimrSchemeOp::TSolomonVolumeDescription::TPartition &partition : desc.GetPartitions()) { + if (partition.GetPartitionId() == partitionId) { + KVTabletId = partition.GetTabletId(); + break; + } + } + } + + if (!KVTabletId) { + this->Reply(StatusIds::INTERNAL_ERROR, "Partition wasn't found.", NKikimrIssues::TIssuesIds::DEFAULT_ERROR, this->ActorContext()); + return; + } + + CreatePipe(); + SendRequest(); + } + + void SendRequest() { + std::unique_ptr<TKVRequest> req = std::make_unique<TKVRequest>(); + auto &rec = *this->GetProtoRequest(); + CopyProtobuf(rec, &req->Record); + req->Record.set_tablet_id(KVTabletId); + NTabletPipe::SendData(this->SelfId(), KVPipeClient, req.release(), 0); + } + + void Handle(typename TKVRequest::TResponse::TPtr &ev) { + TResultRecord result; + CopyProtobuf(ev->Get()->Record, &result); + auto status = PullStatus(ev->Get()->Record); + this->ReplyWithResult(status, result, TActivationContext::AsActorContext()); + } + + NTabletPipe::TClientConfig GetPipeConfig() { + NTabletPipe::TClientConfig cfg; + cfg.RetryPolicy = { + .RetryLimitCount = 3u + }; + return cfg; + } + + void CreatePipe() { + KVPipeClient = this->Register(NTabletPipe::CreateClient(this->SelfId(), KVTabletId, GetPipeConfig())); + } + + void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) { + if (ev->Get()->Status != NKikimrProto::OK) { + this->Reply(StatusIds::UNAVAILABLE, "Failed to connect to coordination node.", NKikimrIssues::TIssuesIds::SHARD_NOT_AVAILABLE, this->ActorContext()); + } + } + + void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr&) { + this->Reply(StatusIds::UNAVAILABLE, "Connection to coordination node was lost.", NKikimrIssues::TIssuesIds::SHARD_NOT_AVAILABLE, this->ActorContext()); + } + + virtual bool ValidateRequest(Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) = 0; + + void PassAway() override { + if (KVPipeClient) { + NTabletPipe::CloseClient(this->SelfId(), KVPipeClient); + KVPipeClient = {}; + } + TBase::PassAway(); + } + +protected: + ui64 KVTabletId = 0; + TActorId KVPipeClient; +}; + +class TAcquireLockRequest + : public TKeyValueRequestGrpc<TAcquireLockRequest, TEvAcquireLockKeyValueRequest, + Ydb::KeyValue::AcquireLockResult, TEvKeyValue::TEvAcquireLock> +{ +public: + using TBase = TKeyValueRequestGrpc<TAcquireLockRequest, TEvAcquireLockKeyValueRequest, + Ydb::KeyValue::AcquireLockResult, TEvKeyValue::TEvAcquireLock>; + using TBase::TBase; + + bool ValidateRequest(Ydb::StatusIds::StatusCode& /*status*/, NYql::TIssues& /*issues*/) override { + return true; + } + NACLib::EAccessRights GetRequiredAccessRights() const { + return NACLib::UpdateRow; + } +}; + + +class TExecuteTransactionRequest + : public TKeyValueRequestGrpc<TExecuteTransactionRequest, TEvExecuteTransactionKeyValueRequest, + Ydb::KeyValue::ExecuteTransactionResult, TEvKeyValue::TEvExecuteTransaction> { +public: + using TBase = TKeyValueRequestGrpc<TExecuteTransactionRequest, TEvExecuteTransactionKeyValueRequest, + Ydb::KeyValue::ExecuteTransactionResult, TEvKeyValue::TEvExecuteTransaction>; + using TBase::TBase; + + bool ValidateRequest(Ydb::StatusIds::StatusCode& /*status*/, NYql::TIssues& /*issues*/) override { + return true; + } + + NACLib::EAccessRights GetRequiredAccessRights() const { + ui32 accessRights = 0; + auto &rec = *this->GetProtoRequest(); + for (auto &command : rec.commands()) { + if (command.has_delete_range()) { + accessRights |= NACLib::EraseRow; + } + if (command.has_rename()) { + accessRights |= NACLib::UpdateRow | NACLib::EraseRow; + } + if (command.has_copy_range()) { + accessRights |= NACLib::UpdateRow; + } + if (command.has_concat() && !command.concat().keep_inputs()) { + accessRights |= NACLib::UpdateRow | NACLib::EraseRow; + } + if (command.has_concat() && command.concat().keep_inputs()) { + accessRights |= NACLib::UpdateRow; + } + if (command.has_write()) { + accessRights |= NACLib::UpdateRow; + } + } + return static_cast<NACLib::EAccessRights>(accessRights); + } +}; + +class TReadRequest + : public TKeyValueRequestGrpc<TReadRequest, TEvReadKeyValueRequest, + Ydb::KeyValue::ReadResult, TEvKeyValue::TEvRead> { +public: + using TBase = TKeyValueRequestGrpc<TReadRequest, TEvReadKeyValueRequest, + Ydb::KeyValue::ReadResult, TEvKeyValue::TEvRead>; + using TBase::TBase; + using TBase::Handle; + STFUNC(StateFunc) { + switch (ev->GetTypeRewrite()) { + default: + return TBase::StateFunc(ev); + } + } + bool ValidateRequest(Ydb::StatusIds::StatusCode& /*status*/, NYql::TIssues& /*issues*/) override { + return true; + } + NACLib::EAccessRights GetRequiredAccessRights() const { + return NACLib::SelectRow; + } +}; + +class TReadRangeRequest + : public TKeyValueRequestGrpc<TReadRangeRequest, TEvReadRangeKeyValueRequest, + Ydb::KeyValue::ReadRangeResult, TEvKeyValue::TEvReadRange> { +public: + using TBase = TKeyValueRequestGrpc<TReadRangeRequest, TEvReadRangeKeyValueRequest, + Ydb::KeyValue::ReadRangeResult, TEvKeyValue::TEvReadRange>; + using TBase::TBase; + using TBase::Handle; + STFUNC(StateFunc) { + switch (ev->GetTypeRewrite()) { + default: + return TBase::StateFunc(ev); + } + } + bool ValidateRequest(Ydb::StatusIds::StatusCode& /*status*/, NYql::TIssues& /*issues*/) override { + return true; + } + NACLib::EAccessRights GetRequiredAccessRights() const { + return NACLib::SelectRow; + } +}; + +class TListRangeRequest + : public TKeyValueRequestGrpc<TListRangeRequest, TEvListRangeKeyValueRequest, + Ydb::KeyValue::ListRangeResult, TEvKeyValue::TEvReadRange> { +public: + using TBase = TKeyValueRequestGrpc<TListRangeRequest, TEvListRangeKeyValueRequest, + Ydb::KeyValue::ListRangeResult, TEvKeyValue::TEvReadRange>; + using TBase::TBase; + using TBase::Handle; + STFUNC(StateFunc) { + switch (ev->GetTypeRewrite()) { + default: + return TBase::StateFunc(ev); + } + } + bool ValidateRequest(Ydb::StatusIds::StatusCode& /*status*/, NYql::TIssues& /*issues*/) override { + return true; + } + NACLib::EAccessRights GetRequiredAccessRights() const { + return NACLib::SelectRow; + } +}; + +class TGetStorageChannelStatusRequest + : public TKeyValueRequestGrpc<TGetStorageChannelStatusRequest, TEvGetStorageChannelStatusKeyValueRequest, + Ydb::KeyValue::GetStorageChannelStatusResult, TEvKeyValue::TEvGetStorageChannelStatus> { +public: + using TBase = TKeyValueRequestGrpc<TGetStorageChannelStatusRequest, TEvGetStorageChannelStatusKeyValueRequest, + Ydb::KeyValue::GetStorageChannelStatusResult, TEvKeyValue::TEvGetStorageChannelStatus>; + using TBase::TBase; + using TBase::Handle; + STFUNC(StateFunc) { + switch (ev->GetTypeRewrite()) { + default: + return TBase::StateFunc(ev); + } + } + bool ValidateRequest(Ydb::StatusIds::StatusCode& /*status*/, NYql::TIssues& /*issues*/) override { + return true; + } + NACLib::EAccessRights GetRequiredAccessRights() const { + return NACLib::DescribeSchema; + } +}; + +} + + +void DoCreateVolumeKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TCreateVolumeRequest(p.release())); +} + +void DoDropVolumeKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TDropVolumeRequest(p.release())); +} + +void DoAlterVolumeKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TAlterVolumeRequest(p.release())); +} + +void DoDescribeVolumeKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TDescribeVolumeRequest(p.release())); +} + +void DoListLocalPartitionsKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TListLocalPartitionsRequest(p.release())); +} + +void DoAcquireLockKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TAcquireLockRequest(p.release())); +} + +void DoExecuteTransactionKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TExecuteTransactionRequest(p.release())); +} + +void DoReadKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TReadRequest(p.release())); +} + +void DoReadRangeKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TReadRangeRequest(p.release())); +} + +void DoListRangeKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TListRangeRequest(p.release())); +} + +void DoGetStorageChannelStatusKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TGetStorageChannelStatusRequest(p.release())); +} + +} // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/service_keyvalue.h b/ydb/core/grpc_services/service_keyvalue.h new file mode 100644 index 0000000000..9bbb430178 --- /dev/null +++ b/ydb/core/grpc_services/service_keyvalue.h @@ -0,0 +1,23 @@ +#pragma once + +#include <memory> + +namespace NKikimr::NGRpcService { + + class IRequestOpCtx; + class IFacilityProvider; + + void DoCreateVolumeKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); + void DoDropVolumeKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); + void DoAlterVolumeKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); + void DoDescribeVolumeKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); + void DoListLocalPartitionsKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); + + void DoAcquireLockKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); + void DoExecuteTransactionKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); + void DoReadKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); + void DoReadRangeKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); + void DoListRangeKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); + void DoGetStorageChannelStatusKeyValue(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); + +} // NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/ya.make b/ydb/core/grpc_services/ya.make index 1921bf675a..f7265ec157 100644 --- a/ydb/core/grpc_services/ya.make +++ b/ydb/core/grpc_services/ya.make @@ -46,6 +46,7 @@ SRCS( rpc_import.cpp rpc_import_data.cpp rpc_keep_alive.cpp + rpc_keyvalue.cpp rpc_kh_describe.cpp rpc_kh_snapshots.cpp rpc_kqp_base.cpp diff --git a/ydb/services/CMakeLists.txt b/ydb/services/CMakeLists.txt index 4ec40bc8a3..2073d2df19 100644 --- a/ydb/services/CMakeLists.txt +++ b/ydb/services/CMakeLists.txt @@ -15,6 +15,7 @@ add_subdirectory(dynamic_config) add_subdirectory(ext_index) add_subdirectory(fq) add_subdirectory(kesus) +add_subdirectory(keyvalue) add_subdirectory(lib) add_subdirectory(local_discovery) add_subdirectory(maintenance) diff --git a/ydb/services/keyvalue/CMakeLists.darwin-x86_64.txt b/ydb/services/keyvalue/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..522f6f9097 --- /dev/null +++ b/ydb/services/keyvalue/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,24 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(ut) + +add_library(ydb-services-keyvalue) +target_link_libraries(ydb-services-keyvalue PUBLIC + contrib-libs-cxxsupp + yutil + api-grpc + cpp-grpc-server + ydb-core-grpc_services + core-grpc_services-base + core-kesus-tablet + ydb-core-keyvalue +) +target_sources(ydb-services-keyvalue PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/keyvalue/grpc_service.cpp +) diff --git a/ydb/services/keyvalue/CMakeLists.linux-aarch64.txt b/ydb/services/keyvalue/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..fc41c03b82 --- /dev/null +++ b/ydb/services/keyvalue/CMakeLists.linux-aarch64.txt @@ -0,0 +1,25 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(ut) + +add_library(ydb-services-keyvalue) +target_link_libraries(ydb-services-keyvalue PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + api-grpc + cpp-grpc-server + ydb-core-grpc_services + core-grpc_services-base + core-kesus-tablet + ydb-core-keyvalue +) +target_sources(ydb-services-keyvalue PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/keyvalue/grpc_service.cpp +) diff --git a/ydb/services/keyvalue/CMakeLists.linux-x86_64.txt b/ydb/services/keyvalue/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..fc41c03b82 --- /dev/null +++ b/ydb/services/keyvalue/CMakeLists.linux-x86_64.txt @@ -0,0 +1,25 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(ut) + +add_library(ydb-services-keyvalue) +target_link_libraries(ydb-services-keyvalue PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + api-grpc + cpp-grpc-server + ydb-core-grpc_services + core-grpc_services-base + core-kesus-tablet + ydb-core-keyvalue +) +target_sources(ydb-services-keyvalue PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/keyvalue/grpc_service.cpp +) diff --git a/ydb/services/keyvalue/CMakeLists.txt b/ydb/services/keyvalue/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/services/keyvalue/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/services/keyvalue/CMakeLists.windows-x86_64.txt b/ydb/services/keyvalue/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..522f6f9097 --- /dev/null +++ b/ydb/services/keyvalue/CMakeLists.windows-x86_64.txt @@ -0,0 +1,24 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(ut) + +add_library(ydb-services-keyvalue) +target_link_libraries(ydb-services-keyvalue PUBLIC + contrib-libs-cxxsupp + yutil + api-grpc + cpp-grpc-server + ydb-core-grpc_services + core-grpc_services-base + core-kesus-tablet + ydb-core-keyvalue +) +target_sources(ydb-services-keyvalue PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/keyvalue/grpc_service.cpp +) diff --git a/ydb/services/keyvalue/grpc_service.cpp b/ydb/services/keyvalue/grpc_service.cpp new file mode 100644 index 0000000000..8e3b663027 --- /dev/null +++ b/ydb/services/keyvalue/grpc_service.cpp @@ -0,0 +1,81 @@ +#include "grpc_service.h" + +#include <ydb/core/grpc_services/grpc_helper.h> +#include <ydb/core/grpc_services/base/base.h> +#include <ydb/core/grpc_services/service_keyvalue.h> + + +namespace NKikimr::NGRpcService { + +TKeyValueGRpcService::TKeyValueGRpcService(NActors::TActorSystem* actorSystem, TIntrusivePtr<NMonitoring::TDynamicCounters> counters, NActors::TActorId grpcRequestProxyId) + : ActorSystem(actorSystem) + , Counters(std::move(counters)) + , GRpcRequestProxyId(grpcRequestProxyId) +{ +} + +TKeyValueGRpcService::~TKeyValueGRpcService() = default; + +void TKeyValueGRpcService::InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) { + CQ = cq; + SetupIncomingRequests(std::move(logger)); +} + +void TKeyValueGRpcService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) { + Limiter = limiter; +} + +bool TKeyValueGRpcService::IncRequest() { + return Limiter->Inc(); +} + +void TKeyValueGRpcService::DecRequest() { + Limiter->Dec(); +} + +void TKeyValueGRpcService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { + auto getCounterBlock = NGRpcService::CreateCounterCb(Counters, ActorSystem); + +#ifdef SETUP_METHOD +#error SETUP_METHOD macro collision +#endif + +#define SETUP_METHOD(methodName, method, rlMode) \ + MakeIntrusive<NGRpcService::TGRpcRequest< \ + Ydb::KeyValue::Y_CAT(methodName, Request), \ + Ydb::KeyValue::Y_CAT(methodName, Response), \ + TKeyValueGRpcService>> \ + ( \ + this, \ + &Service_, \ + CQ, \ + [this](NGrpc::IRequestContextBase* reqCtx) { \ + NGRpcService::ReportGrpcReqToMon(*ActorSystem, reqCtx->GetPeer()); \ + ActorSystem->Send(GRpcRequestProxyId, new TGrpcRequestOperationCall< \ + Ydb::KeyValue::Y_CAT(methodName, Request), \ + Ydb::KeyValue::Y_CAT(methodName, Response)>(reqCtx, &method, \ + TRequestAuxSettings{rlMode, nullptr})); \ + }, \ + &Ydb::KeyValue::V1::KeyValueService::AsyncService::Y_CAT(Request, methodName), \ + "KeyValue/" Y_STRINGIZE(methodName), \ + logger, \ + getCounterBlock("keyvalue", Y_STRINGIZE(methodName)) \ + )->Run() + + SETUP_METHOD(CreateVolume, DoCreateVolumeKeyValue, TRateLimiterMode::Rps); + SETUP_METHOD(DropVolume, DoDropVolumeKeyValue, TRateLimiterMode::Rps); + SETUP_METHOD(AlterVolume, DoAlterVolumeKeyValue, TRateLimiterMode::Rps); + SETUP_METHOD(DescribeVolume, DoDescribeVolumeKeyValue, TRateLimiterMode::Rps); + SETUP_METHOD(ListLocalPartitions, DoListLocalPartitionsKeyValue, TRateLimiterMode::Rps); + + SETUP_METHOD(AcquireLock, DoAcquireLockKeyValue, TRateLimiterMode::Rps); + SETUP_METHOD(ExecuteTransaction, DoExecuteTransactionKeyValue, TRateLimiterMode::Rps); + SETUP_METHOD(Read, DoReadKeyValue, TRateLimiterMode::Rps); + SETUP_METHOD(ReadRange, DoReadRangeKeyValue, TRateLimiterMode::Rps); + SETUP_METHOD(ListRange, DoListRangeKeyValue, TRateLimiterMode::Rps); + SETUP_METHOD(GetStorageChannelStatus, DoGetStorageChannelStatusKeyValue, TRateLimiterMode::Rps); + +#undef SETUP_METHOD +} + +} // namespace NKikimr::NGRpcService diff --git a/ydb/services/keyvalue/grpc_service.h b/ydb/services/keyvalue/grpc_service.h new file mode 100644 index 0000000000..18161acad1 --- /dev/null +++ b/ydb/services/keyvalue/grpc_service.h @@ -0,0 +1,37 @@ +#pragma once + +#include <ydb/public/api/grpc/ydb_keyvalue_v1.grpc.pb.h> + +#include <library/cpp/grpc/server/grpc_server.h> +#include <library/cpp/actors/core/actorsystem.h> + + +namespace NKikimr::NGRpcService { + +class TKeyValueGRpcService + : public NGrpc::TGrpcServiceBase<Ydb::KeyValue::V1::KeyValueService> +{ +public: + TKeyValueGRpcService(NActors::TActorSystem* actorSystem, TIntrusivePtr<NMonitoring::TDynamicCounters> counters, + NActors::TActorId grpcRequestProxyId); + ~TKeyValueGRpcService(); + + void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override; + void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override; + + bool IncRequest(); + void DecRequest(); + +private: + void SetupIncomingRequests(NGrpc::TLoggerPtr logger); + +private: + NActors::TActorSystem* ActorSystem = nullptr; + TIntrusivePtr<NMonitoring::TDynamicCounters> Counters; + NActors::TActorId GRpcRequestProxyId; + + grpc::ServerCompletionQueue* CQ = nullptr; + NGrpc::TGlobalLimiter* Limiter = nullptr; +}; + +} // namespace NKikimr::NGRpcService diff --git a/ydb/services/keyvalue/grpc_service_ut.cpp b/ydb/services/keyvalue/grpc_service_ut.cpp new file mode 100644 index 0000000000..131c3ac363 --- /dev/null +++ b/ydb/services/keyvalue/grpc_service_ut.cpp @@ -0,0 +1,838 @@ +#include "grpc_service.h" + +#include <ydb/core/keyvalue/keyvalue.h> +#include <ydb/core/keyvalue/keyvalue_events.h> +#include <ydb/core/protos/config.pb.h> +#include <ydb/core/testlib/basics/appdata.h> +#include <ydb/core/testlib/test_client.h> +#include <ydb/core/tx/scheme_cache/scheme_cache.h> + +#include <ydb/public/api/grpc/ydb_scheme_v1.grpc.pb.h> + +#include <ydb/public/sdk/cpp/client/resources/ydb_resources.h> + +#include <library/cpp/grpc/client/grpc_client_low.h> +#include <library/cpp/testing/unittest/registar.h> +#include <library/cpp/testing/unittest/tests_data.h> +#include <library/cpp/logger/backend.h> + +#include <grpc++/client_context.h> +#include <grpc++/create_channel.h> + +#include <util/string/builder.h> + + +TString PrintIssue(const ::google::protobuf::RepeatedPtrField< ::Ydb::Issue::IssueMessage> &issues) { + TStringBuilder msg; + msg << '{'; + for (auto &issue : issues) { + msg << " issue# " << issue.message(); + } + msg << " }"; + return msg; +} + + +#define UNIT_ASSERT_CHECK_STATUS(got, exp) \ + UNIT_ASSERT_C(got.status() == exp, "exp# " << Ydb::StatusIds::StatusCode_Name(exp) \ + << " got# " << Ydb::StatusIds::StatusCode_Name(got.status()) << " issues# " << PrintIssue(got.issues())) \ +// UNIT_ASSERT_CHECK_STATUS + + +namespace NKikimr::NGRpcService { + + +struct TKikimrTestSettings { + static constexpr bool SSL = false; + static constexpr bool AUTH = false; + static constexpr bool PrecreatePools = true; + static constexpr bool EnableSystemViews = true; +}; + +struct TKikimrTestWithAuth : TKikimrTestSettings { + static constexpr bool AUTH = true; +}; + +struct TKikimrTestWithAuthAndSsl : TKikimrTestWithAuth { + static constexpr bool SSL = true; +}; + +struct TKikimrTestNoSystemViews : TKikimrTestSettings { + static constexpr bool EnableSystemViews = false; +}; + +template <typename TestSettings = TKikimrTestSettings> +class TBasicKikimrWithGrpcAndRootSchema { +public: + TBasicKikimrWithGrpcAndRootSchema( + NKikimrConfig::TAppConfig appConfig = {}, + TAutoPtr<TLogBackend> logBackend = {}) + { + ui16 port = PortManager.GetPort(2134); + ui16 grpc = PortManager.GetPort(2135); + ServerSettings = new Tests::TServerSettings(port); + ServerSettings->SetGrpcPort(grpc); + ServerSettings->SetLogBackend(logBackend); + ServerSettings->SetDomainName("Root"); + ServerSettings->SetDynamicNodeCount(1); + if (TestSettings::PrecreatePools) { + ServerSettings->AddStoragePool("ssd"); + ServerSettings->AddStoragePool("hdd"); + ServerSettings->AddStoragePool("hdd1"); + ServerSettings->AddStoragePool("hdd2"); + } else { + ServerSettings->AddStoragePoolType("ssd"); + ServerSettings->AddStoragePoolType("hdd"); + ServerSettings->AddStoragePoolType("hdd1"); + ServerSettings->AddStoragePoolType("hdd2"); + } + ServerSettings->Formats = new TFormatFactory; + ServerSettings->FeatureFlags = appConfig.GetFeatureFlags(); + ServerSettings->RegisterGrpcService<NKikimr::NGRpcService::TKeyValueGRpcService>("keyvalue"); + + Server_.Reset(new Tests::TServer(*ServerSettings)); + Tenants_.Reset(new Tests::TTenants(Server_)); + + //Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NActors::NLog::PRI_DEBUG); + //Server_->GetRuntime()->SetLogPriority(NKikimrServices::SCHEME_BOARD_REPLICA, NActors::NLog::PRI_DEBUG); + //Server_->GetRuntime()->SetLogPriority(NKikimrServices::SCHEME_BOARD_SUBSCRIBER, NActors::NLog::PRI_TRACE); + //Server_->GetRuntime()->SetLogPriority(NKikimrServices::SCHEME_BOARD_POPULATOR, NActors::NLog::PRI_DEBUG); + //Server_->GetRuntime()->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_DEBUG); + //Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_PROXY, NActors::NLog::PRI_DEBUG); + Server_->GetRuntime()->SetLogPriority(NKikimrServices::KEYVALUE, NActors::NLog::PRI_DEBUG); + //Server_->GetRuntime()->SetLogPriority(NKikimrServices::BOOTSTRAPPER, NActors::NLog::PRI_DEBUG); + //Server_->GetRuntime()->SetLogPriority(NKikimrServices::STATESTORAGE, NActors::NLog::PRI_DEBUG); + //Server_->GetRuntime()->SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NActors::NLog::PRI_DEBUG); + //Server_->GetRuntime()->SetLogPriority(NKikimrServices::SAUSAGE_BIO, NActors::NLog::PRI_DEBUG); + //Server_->GetRuntime()->SetLogPriority(NKikimrServices::TABLET_FLATBOOT, NActors::NLog::PRI_DEBUG); + //Server_->GetRuntime()->SetLogPriority(NKikimrServices::TABLET_OPS_HOST, NActors::NLog::PRI_DEBUG); + //Server_->GetRuntime()->SetLogPriority(NKikimrServices::TABLET_SAUSAGECACHE, NActors::NLog::PRI_DEBUG); + //Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_OLAPSHARD, NActors::NLog::PRI_DEBUG); + //Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); + + NGrpc::TServerOptions grpcOption; + if (TestSettings::AUTH) { + grpcOption.SetUseAuth(true); + } + grpcOption.SetPort(grpc); + Server_->EnableGRpc(grpcOption); + + Tests::TClient annoyingClient(*ServerSettings); + if (ServerSettings->AppConfig.GetDomainsConfig().GetSecurityConfig().GetEnforceUserTokenRequirement()) { + annoyingClient.SetSecurityToken("root@builtin"); + } + annoyingClient.InitRootScheme("Root"); + GRpcPort_ = grpc; + } + + ui16 GetPort() { + return GRpcPort_; + } + + TPortManager& GetPortManager() { + return PortManager; + } + + void ResetSchemeCache(TString path, ui32 nodeIndex = 0) { + TTestActorRuntime* runtime = Server_->GetRuntime(); + Tests::TClient annoyingClient(*ServerSettings); + annoyingClient.RefreshPathCache(runtime, path, nodeIndex); + } + + TTestActorRuntime* GetRuntime() { + return Server_->GetRuntime(); + } + + Tests::TServer& GetServer() { + return *Server_; + } + + Tests::TServerSettings::TPtr ServerSettings; + Tests::TServer::TPtr Server_; + THolder<Tests::TTenants> Tenants_; +private: + TPortManager PortManager; + ui16 GRpcPort_; +}; + +using TKikimrWithGrpcAndRootSchema = TBasicKikimrWithGrpcAndRootSchema<TKikimrTestSettings>; + +Y_UNIT_TEST_SUITE(KeyValueGRPCService) { + + void InitTablet(TKikimrWithGrpcAndRootSchema &server, ui64 tabletId) { + server.GetRuntime()->SetScheduledLimit(100); + CreateTestBootstrapper(*server.GetRuntime(), + CreateTestTabletInfo(tabletId, TTabletTypes::KeyValue), + &CreateKeyValueFlat); + NanoSleep(3'000'000'000); + } + + void CmdWrite(ui64 tabletId, const TDeque<TString> &keys, const TDeque<TString> &values, TKikimrWithGrpcAndRootSchema &server) + { + Y_VERIFY(keys.size() == values.size()); + TAutoPtr<IEventHandle> handle; + TEvKeyValue::TEvResponse *result; + THolder<TEvKeyValue::TEvRequest> request; + TActorId edgeActor = server.GetRuntime()->AllocateEdgeActor(); + for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) { + try { + server.GetRuntime()->ResetScheduledCount(); + request.Reset(new TEvKeyValue::TEvRequest); + for (ui64 idx = 0; idx < keys.size(); ++idx) { + auto write = request->Record.AddCmdWrite(); + write->SetKey(keys[idx]); + write->SetValue(values[idx]); + write->SetStorageChannel(NKikimrClient::TKeyValueRequest::MAIN); + write->SetPriority(NKikimrClient::TKeyValueRequest::REALTIME); + } + server.GetRuntime()->SendToPipe(tabletId, edgeActor, request.Release(), 0, GetPipeConfigWithRetries()); + result = server.GetRuntime()->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle); + UNIT_ASSERT(result); + UNIT_ASSERT(result->Record.HasStatus()); + UNIT_ASSERT_EQUAL(result->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK); + UNIT_ASSERT_VALUES_EQUAL(result->Record.WriteResultSize(), values.size()); + for (ui64 idx = 0; idx < values.size(); ++idx) { + const auto &writeResult = result->Record.GetWriteResult(idx); + UNIT_ASSERT(writeResult.HasStatus()); + UNIT_ASSERT_EQUAL(writeResult.GetStatus(), NKikimrProto::OK); + UNIT_ASSERT(writeResult.HasStatusFlags()); + if (values[idx].size()) { + UNIT_ASSERT(writeResult.GetStatusFlags() & ui32(NKikimrBlobStorage::StatusIsValid)); + } + } + retriesLeft = 0; + } catch (NActors::TSchedulingLimitReachedException) { + UNIT_ASSERT(retriesLeft == 2); + } + } + } + + template <typename TCtx> + void AdjustCtxForDB(TCtx &ctx) { + ctx.AddMetadata(NYdb::YDB_AUTH_TICKET_HEADER, "root@builtin"); + } + + void MakeDirectory(auto &channel, const TString &path) { + std::unique_ptr<Ydb::Scheme::V1::SchemeService::Stub> stub; + stub = Ydb::Scheme::V1::SchemeService::NewStub(channel); + + Ydb::Scheme::MakeDirectoryRequest makeDirectoryRequest; + makeDirectoryRequest.set_path(path); + Ydb::Scheme::MakeDirectoryResponse makeDirectoryResponse; + grpc::ClientContext makeDirectoryCtx; + AdjustCtxForDB(makeDirectoryCtx); + stub->MakeDirectory(&makeDirectoryCtx, makeDirectoryRequest, &makeDirectoryResponse); + UNIT_ASSERT_CHECK_STATUS(makeDirectoryResponse.operation(), Ydb::StatusIds::SUCCESS); + } + + void MakeTable(auto &channel, const TString &path) { + std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> stub; + stub = Ydb::KeyValue::V1::KeyValueService::NewStub(channel); + + Ydb::KeyValue::CreateVolumeRequest createVolumeRequest; + createVolumeRequest.set_path(path); + createVolumeRequest.set_partition_count(1); + auto *storage_config = createVolumeRequest.mutable_storage_config(); + storage_config->add_channel()->set_media("ssd"); + storage_config->add_channel()->set_media("ssd"); + storage_config->add_channel()->set_media("ssd"); + + Ydb::KeyValue::CreateVolumeResponse createVolumeResponse; + Ydb::KeyValue::CreateVolumeResult createVolumeResult; + + grpc::ClientContext createVolumeCtx; + AdjustCtxForDB(createVolumeCtx); + stub->CreateVolume(&createVolumeCtx, createVolumeRequest, &createVolumeResponse); + UNIT_ASSERT_CHECK_STATUS(createVolumeResponse.operation(), Ydb::StatusIds::SUCCESS); + createVolumeResponse.operation().result().UnpackTo(&createVolumeResult); + } + + void AlterVolume(auto &channel, const TString &path, ui32 partition_count = 1) { + std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> stub; + stub = Ydb::KeyValue::V1::KeyValueService::NewStub(channel); + + Ydb::KeyValue::AlterVolumeRequest alterVolumeRequest; + alterVolumeRequest.set_path(path); + alterVolumeRequest.set_alter_partition_count(partition_count); + + Ydb::KeyValue::AlterVolumeResponse alterVolumeResponse; + Ydb::KeyValue::AlterVolumeResult alterVolumeResult; + + grpc::ClientContext alterVolumeCtx; + AdjustCtxForDB(alterVolumeCtx); + stub->AlterVolume(&alterVolumeCtx, alterVolumeRequest, &alterVolumeResponse); + UNIT_ASSERT_CHECK_STATUS(alterVolumeResponse.operation(), Ydb::StatusIds::SUCCESS); + alterVolumeResponse.operation().result().UnpackTo(&alterVolumeResult); + } + + void DropVolume(auto &channel, const TString &path) { + std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> stub; + stub = Ydb::KeyValue::V1::KeyValueService::NewStub(channel); + + Ydb::KeyValue::DropVolumeRequest dropVolumeRequest; + dropVolumeRequest.set_path(path); + + Ydb::KeyValue::DropVolumeResponse dropVolumeResponse; + Ydb::KeyValue::DropVolumeResult dropVolumeResult; + + grpc::ClientContext dropVolumeCtx; + AdjustCtxForDB(dropVolumeCtx); + stub->DropVolume(&dropVolumeCtx, dropVolumeRequest, &dropVolumeResponse); + UNIT_ASSERT_CHECK_STATUS(dropVolumeResponse.operation(), Ydb::StatusIds::SUCCESS); + dropVolumeResponse.operation().result().UnpackTo(&dropVolumeResult); + } + + Ydb::KeyValue::DescribeVolumeResult DescribeVolume(auto &channel, const TString &path) { + std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> stub; + stub = Ydb::KeyValue::V1::KeyValueService::NewStub(channel); + + Ydb::KeyValue::DescribeVolumeRequest describeVolumeRequest; + describeVolumeRequest.set_path(path); + + Ydb::KeyValue::DescribeVolumeResponse describeVolumeResponse; + Ydb::KeyValue::DescribeVolumeResult describeVolumeResult; + + grpc::ClientContext describeVolumeCtx; + AdjustCtxForDB(describeVolumeCtx); + stub->DescribeVolume(&describeVolumeCtx, describeVolumeRequest, &describeVolumeResponse); + UNIT_ASSERT_CHECK_STATUS(describeVolumeResponse.operation(), Ydb::StatusIds::SUCCESS); + describeVolumeResponse.operation().result().UnpackTo(&describeVolumeResult); + return describeVolumeResult; + } + + + Ydb::Scheme::ListDirectoryResult ListDirectory(auto &channel, const TString &path) { + std::unique_ptr<Ydb::Scheme::V1::SchemeService::Stub> stub; + stub = Ydb::Scheme::V1::SchemeService::NewStub(channel); + Ydb::Scheme::ListDirectoryRequest listDirectoryRequest; + listDirectoryRequest.set_path(path); + + Ydb::Scheme::ListDirectoryResult listDirectoryResult; + Ydb::Scheme::ListDirectoryResponse listDirectoryResponse; + + grpc::ClientContext listDirectoryCtx; + AdjustCtxForDB(listDirectoryCtx); + stub->ListDirectory(&listDirectoryCtx, listDirectoryRequest, &listDirectoryResponse); + + UNIT_ASSERT_CHECK_STATUS(listDirectoryResponse.operation(), Ydb::StatusIds::SUCCESS); + listDirectoryResponse.operation().result().UnpackTo(&listDirectoryResult); + return listDirectoryResult; + } + + ui64 AcquireLock( const TString &path, ui64 partitionId, auto &stub) { + Ydb::KeyValue::AcquireLockRequest request; + request.set_path(path); + request.set_partition_id(partitionId); + + Ydb::KeyValue::AcquireLockResponse response; + Ydb::KeyValue::AcquireLockResult result; + + grpc::ClientContext ctx; + AdjustCtxForDB(ctx); + stub->AcquireLock(&ctx, request, &response); + UNIT_ASSERT_CHECK_STATUS(response.operation(), Ydb::StatusIds::SUCCESS); + response.operation().result().UnpackTo(&result); + return result.lock_generation(); + } + + void WaitTableCreation(TKikimrWithGrpcAndRootSchema &server, const TString &path) { + bool again = true; + for (ui32 i = 0; i < 10 && again; ++i) { + Cerr << "Wait iteration# " << i << Endl; + auto req = MakeHolder<NSchemeCache::TSchemeCacheNavigate>(); + auto& entry = req->ResultSet.emplace_back(); + entry.Path = SplitPath(path); + entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByPath; + entry.ShowPrivatePath = true; + entry.SyncVersion = false; + req->UserToken = new NACLib::TUserToken("root@builtin", {}); + UNIT_ASSERT(req->UserToken); + TActorId edgeActor = server.GetRuntime()->AllocateEdgeActor(); + auto ev = new TEvTxProxySchemeCache::TEvNavigateKeySet(req.Release()); + UNIT_ASSERT(ev->Request->UserToken); + auto schemeCache = MakeSchemeCacheID(); + server.GetRuntime()->Send(new IEventHandle(schemeCache, edgeActor, ev)); + + TAutoPtr<IEventHandle> handle; + auto *result = server.GetRuntime()->GrabEdgeEvent<TEvTxProxySchemeCache::TEvNavigateKeySetResult>(handle); + UNIT_ASSERT_VALUES_EQUAL(result->Request->ResultSet.size(), 1); + again = result->Request->ResultSet[0].Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok; + } + } + + void MakeSimpleTest(const TString &tablePath, + std::function<void(const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub>&)> func) + { + TKikimrWithGrpcAndRootSchema server; + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + + //////////////////////////////////////////////////////////////////////// + + std::shared_ptr<grpc::Channel> channel; + std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> stub; + channel = grpc::CreateChannel("localhost:" + ToString(grpc), grpc::InsecureChannelCredentials()); + MakeDirectory(channel, "/Root/mydb"); + MakeTable(channel, tablePath); + auto pr = SplitPath(tablePath); + Ydb::Scheme::ListDirectoryResult listDirectoryResult = ListDirectory(channel, "/Root/mydb"); + UNIT_ASSERT_VALUES_EQUAL(listDirectoryResult.self().name(), "mydb"); + UNIT_ASSERT_VALUES_EQUAL(listDirectoryResult.children(0).name(), pr.back()); + + WaitTableCreation(server, tablePath); + stub = Ydb::KeyValue::V1::KeyValueService::NewStub(channel); + func(stub); + } + + Y_UNIT_TEST(SimpleAcquireLock) { + TString tablePath = "/Root/mydb/kvtable"; + MakeSimpleTest(tablePath, [tablePath](const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> &stub){ + Ydb::KeyValue::AcquireLockRequest request; + request.set_path(tablePath); + request.set_partition_id(0); + Ydb::KeyValue::AcquireLockResponse response; + Ydb::KeyValue::AcquireLockResult result; + + grpc::ClientContext ctx1; + AdjustCtxForDB(ctx1); + stub->AcquireLock(&ctx1, request, &response); + UNIT_ASSERT_CHECK_STATUS(response.operation(), Ydb::StatusIds::SUCCESS); + response.operation().result().UnpackTo(&result); + UNIT_ASSERT(result.lock_generation() == 1); + + grpc::ClientContext ctx2; + AdjustCtxForDB(ctx2); + stub->AcquireLock(&ctx2, request, &response); + UNIT_ASSERT_CHECK_STATUS(response.operation(), Ydb::StatusIds::SUCCESS); + response.operation().result().UnpackTo(&result); + UNIT_ASSERT(result.lock_generation() == 2); + }); + } + + Y_UNIT_TEST(SimpleExecuteTransaction) { + TString tablePath = "/Root/mydb/kvtable"; + MakeSimpleTest(tablePath, [tablePath](const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> &stub){ + Ydb::KeyValue::ExecuteTransactionRequest request; + request.set_path(tablePath); + request.set_partition_id(0); + Ydb::KeyValue::ExecuteTransactionResponse response; + + grpc::ClientContext ctx; + AdjustCtxForDB(ctx); + stub->ExecuteTransaction(&ctx, request, &response); + UNIT_ASSERT_CHECK_STATUS(response.operation(), Ydb::StatusIds::SUCCESS); + }); + } + + Y_UNIT_TEST(SimpleExecuteTransactionWithWrongGeneration) { + TString tablePath = "/Root/mydb/kvtable"; + MakeSimpleTest(tablePath, [tablePath](const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> &stub){ + Ydb::KeyValue::ExecuteTransactionRequest request; + request.set_path(tablePath); + request.set_partition_id(0); + request.set_lock_generation(42); + Ydb::KeyValue::ExecuteTransactionResponse response; + + grpc::ClientContext ctx; + AdjustCtxForDB(ctx); + stub->ExecuteTransaction(&ctx, request, &response); + UNIT_ASSERT_CHECK_STATUS(response.operation(), Ydb::StatusIds::PRECONDITION_FAILED); + }); + } + + Ydb::KeyValue::ExecuteTransactionResult Write(const TString &path, ui64 partitionId, const TString &key, const TString &value, ui64 storageChannel, + const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> &stub) + { + Ydb::KeyValue::ExecuteTransactionRequest writeRequest; + writeRequest.set_path(path); + writeRequest.set_partition_id(partitionId); + auto *cmd = writeRequest.add_commands(); + auto *write = cmd->mutable_write(); + write->set_key(key); + write->set_value(value); + write->set_storage_channel(storageChannel); + Ydb::KeyValue::ExecuteTransactionResponse writeResponse; + + grpc::ClientContext writeCtx; + AdjustCtxForDB(writeCtx); + stub->ExecuteTransaction(&writeCtx, writeRequest, &writeResponse); + UNIT_ASSERT_CHECK_STATUS(writeResponse.operation(), Ydb::StatusIds::SUCCESS); + Ydb::KeyValue::ExecuteTransactionResult writeResult; + writeResponse.operation().result().UnpackTo(&writeResult); + return writeResult; + } + + void Rename(const TString &path, ui64 partitionId, const TString &oldKey, const TString &newKey, + const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> &stub) + { + Ydb::KeyValue::ExecuteTransactionRequest request; + request.set_path(path); + request.set_partition_id(partitionId); + auto *cmd = request.add_commands(); + auto *rename = cmd->mutable_rename(); + rename->set_old_key(oldKey); + rename->set_new_key(newKey); + Ydb::KeyValue::ExecuteTransactionResponse response; + + grpc::ClientContext ctx; + AdjustCtxForDB(ctx); + stub->ExecuteTransaction(&ctx, request, &response); + UNIT_ASSERT_CHECK_STATUS(response.operation(), Ydb::StatusIds::SUCCESS); + } + + + Y_UNIT_TEST(SimpleRenameUnexistedKey) { + TString tablePath = "/Root/mydb/kvtable"; + MakeSimpleTest(tablePath, [tablePath](const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> &stub){ + Ydb::KeyValue::ExecuteTransactionRequest request; + request.set_path(tablePath); + request.set_partition_id(0); + auto *cmd = request.add_commands(); + auto *rename = cmd->mutable_rename(); + rename->set_old_key("key1"); + rename->set_new_key("key2"); + Ydb::KeyValue::ExecuteTransactionResponse response; + + grpc::ClientContext ctx; + AdjustCtxForDB(ctx); + stub->ExecuteTransaction(&ctx, request, &response); + UNIT_ASSERT_CHECK_STATUS(response.operation(), Ydb::StatusIds::NOT_FOUND); + }); + } + + Y_UNIT_TEST(SimpleConcatUnexistedKey) { + TString tablePath = "/Root/mydb/kvtable"; + MakeSimpleTest(tablePath, [tablePath](const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> &stub){ + Ydb::KeyValue::ExecuteTransactionRequest request; + request.set_path(tablePath); + request.set_partition_id(0); + auto *cmd = request.add_commands(); + auto *concat = cmd->mutable_concat(); + concat->add_input_keys("key1"); + concat->add_input_keys("key2"); + concat->set_output_key("key3"); + Ydb::KeyValue::ExecuteTransactionResponse response; + + grpc::ClientContext ctx; + AdjustCtxForDB(ctx); + stub->ExecuteTransaction(&ctx, request, &response); + UNIT_ASSERT_CHECK_STATUS(response.operation(), Ydb::StatusIds::NOT_FOUND); + }); + } + + Y_UNIT_TEST(SimpleCopyUnexistedKey) { + TString tablePath = "/Root/mydb/kvtable"; + MakeSimpleTest(tablePath, [tablePath](const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> &stub){ + Ydb::KeyValue::ExecuteTransactionRequest request; + request.set_path(tablePath); + request.set_partition_id(0); + auto *cmd = request.add_commands(); + auto *rename = cmd->mutable_copy_range(); + auto *range = rename->mutable_range(); + range->set_from_key_inclusive("key1"); + range->set_to_key_inclusive("key2"); + rename->set_prefix_to_add("A"); + Ydb::KeyValue::ExecuteTransactionResponse response; + + grpc::ClientContext ctx; + AdjustCtxForDB(ctx); + stub->ExecuteTransaction(&ctx, request, &response); + UNIT_ASSERT_CHECK_STATUS(response.operation(), Ydb::StatusIds::SUCCESS); + }); + } + + Y_UNIT_TEST(SimpleWriteRead) { + TString tablePath = "/Root/mydb/kvtable"; + MakeSimpleTest(tablePath, [tablePath](const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> &stub){ + Write(tablePath, 0, "key", "value", 0, stub); + + Ydb::KeyValue::ReadRequest readRequest; + readRequest.set_path(tablePath); + readRequest.set_partition_id(0); + readRequest.set_key("key"); + Ydb::KeyValue::ReadResponse readResponse; + Ydb::KeyValue::ReadResult readResult; + + grpc::ClientContext readCtx; + AdjustCtxForDB(readCtx); + stub->Read(&readCtx, readRequest, &readResponse); + UNIT_ASSERT_CHECK_STATUS(readResponse.operation(), Ydb::StatusIds::SUCCESS); + readResponse.operation().result().UnpackTo(&readResult); + UNIT_ASSERT(!readResult.is_overrun()); + UNIT_ASSERT_VALUES_EQUAL(readResult.requested_key(), "key"); + UNIT_ASSERT_VALUES_EQUAL(readResult.value(), "value"); + UNIT_ASSERT_VALUES_EQUAL(readResult.requested_offset(), 0); + UNIT_ASSERT_VALUES_EQUAL(readResult.requested_size(), 5); + }); + } + + Y_UNIT_TEST(SimpleWriteReadWithIncorreectPath) { + TString tablePath = "/Root/mydb/kvtable"; + MakeSimpleTest(tablePath, [tablePath](const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> &stub){ + Write(tablePath, 0, "key", "value", 0, stub); + + Ydb::KeyValue::ReadRequest readRequest; + readRequest.set_path("/Root/mydb/table"); + readRequest.set_partition_id(0); + readRequest.set_key("key"); + Ydb::KeyValue::ReadResponse readResponse; + Ydb::KeyValue::ReadResult readResult; + + grpc::ClientContext readCtx; + AdjustCtxForDB(readCtx); + stub->Read(&readCtx, readRequest, &readResponse); + UNIT_ASSERT_CHECK_STATUS(readResponse.operation(), Ydb::StatusIds::SCHEME_ERROR); + }); + } + + Y_UNIT_TEST(SimpleWriteReadWithoutToken) { + TString tablePath = "/Root/mydb/kvtable"; + MakeSimpleTest(tablePath, [tablePath](const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> &stub){ + return; + Write(tablePath, 0, "key", "value", 0, stub); + + Ydb::KeyValue::ReadRequest readRequest; + readRequest.set_path("/Root/mydb/kvtable"); + readRequest.set_partition_id(0); + readRequest.set_key("key"); + Ydb::KeyValue::ReadResponse readResponse; + Ydb::KeyValue::ReadResult readResult; + + grpc::ClientContext readCtx; + //AdjustCtxForDB(readCtx); + stub->Read(&readCtx, readRequest, &readResponse); + UNIT_ASSERT_CHECK_STATUS(readResponse.operation(), Ydb::StatusIds::SCHEME_ERROR); + }); + } + + Y_UNIT_TEST(SimpleWriteReadWithoutLockGeneration1) { + TString tablePath = "/Root/mydb/kvtable"; + MakeSimpleTest(tablePath, [tablePath](const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> &stub){ + AcquireLock(tablePath, 0, stub); + Write(tablePath, 0, "key", "value", 0, stub); + Ydb::KeyValue::ReadRequest readRequest; + readRequest.set_path(tablePath); + readRequest.set_partition_id(0); + readRequest.set_key("key"); + Ydb::KeyValue::ReadResponse readResponse; + Ydb::KeyValue::ReadResult readResult; + + grpc::ClientContext readCtx; + AdjustCtxForDB(readCtx); + stub->Read(&readCtx, readRequest, &readResponse); + UNIT_ASSERT_CHECK_STATUS(readResponse.operation(), Ydb::StatusIds::SUCCESS); + }); + } + + Y_UNIT_TEST(SimpleWriteReadWithoutLockGeneration2) { + TString tablePath = "/Root/mydb/kvtable"; + MakeSimpleTest(tablePath, [tablePath](const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> &stub){ + Write(tablePath, 0, "key", "value", 0, stub); + AcquireLock(tablePath, 0, stub); + Ydb::KeyValue::ReadRequest readRequest; + readRequest.set_path(tablePath); + readRequest.set_partition_id(0); + readRequest.set_key("key"); + Ydb::KeyValue::ReadResponse readResponse; + Ydb::KeyValue::ReadResult readResult; + + grpc::ClientContext readCtx; + AdjustCtxForDB(readCtx); + stub->Read(&readCtx, readRequest, &readResponse); + UNIT_ASSERT_CHECK_STATUS(readResponse.operation(), Ydb::StatusIds::SUCCESS); + }); + } + + Y_UNIT_TEST(SimpleWriteReadOverrun) { + TString tablePath = "/Root/mydb/kvtable"; + MakeSimpleTest(tablePath, [tablePath](const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> &stub){ + Write(tablePath, 0, "key", "value", 0, stub); + + Ydb::KeyValue::ReadRequest readRequest; + readRequest.set_path(tablePath); + readRequest.set_partition_id(0); + readRequest.set_key("key"); + ui64 limitBytes = 1 + 5 + 3 // Key id, length + + 1 + 5 + 1 // Value id, length, value + + 1 + 8 // Offset id, value + + 1 + 8 // Size id, value + + 1 + 1 // Status id, value + ; + readRequest.set_limit_bytes(limitBytes); + Ydb::KeyValue::ReadResponse readResponse; + Ydb::KeyValue::ReadResult readResult; + + grpc::ClientContext readCtx; + AdjustCtxForDB(readCtx); + stub->Read(&readCtx, readRequest, &readResponse); + UNIT_ASSERT_CHECK_STATUS(readResponse.operation(), Ydb::StatusIds::SUCCESS); + readResponse.operation().result().UnpackTo(&readResult); + UNIT_ASSERT(readResult.is_overrun()); + UNIT_ASSERT_VALUES_EQUAL(readResult.requested_key(), "key"); + UNIT_ASSERT_VALUES_EQUAL(readResult.value(), "v"); + UNIT_ASSERT_VALUES_EQUAL(readResult.requested_offset(), 0); + UNIT_ASSERT_VALUES_EQUAL(readResult.requested_size(), 5); + }); + } + + Y_UNIT_TEST(SimpleWriteReadRange) { + TString tablePath = "/Root/mydb/kvtable"; + MakeSimpleTest(tablePath, [tablePath](const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> &stub){ + Write(tablePath, 0, "key1", "value1", 1, stub); + Write(tablePath, 0, "key2", "value12", 2, stub); + + Ydb::KeyValue::ReadRangeRequest readRangeRequest; + readRangeRequest.set_path(tablePath); + readRangeRequest.set_partition_id(0); + auto *r = readRangeRequest.mutable_range(); + r->set_from_key_inclusive("key1"); + r->set_to_key_inclusive("key3"); + Ydb::KeyValue::ReadRangeResponse readRangeResponse; + Ydb::KeyValue::ReadRangeResult readRangeResult; + + grpc::ClientContext readRangeCtx; + AdjustCtxForDB(readRangeCtx); + stub->ReadRange(&readRangeCtx, readRangeRequest, &readRangeResponse); + UNIT_ASSERT_CHECK_STATUS(readRangeResponse.operation(), Ydb::StatusIds::SUCCESS); + readRangeResponse.operation().result().UnpackTo(&readRangeResult); + + UNIT_ASSERT_VALUES_EQUAL(readRangeResult.pair(0).key(), "key1"); + UNIT_ASSERT_VALUES_EQUAL(readRangeResult.pair(1).key(), "key2"); + + UNIT_ASSERT_VALUES_EQUAL(readRangeResult.pair(0).value(), "value1"); + UNIT_ASSERT_VALUES_EQUAL(readRangeResult.pair(1).value(), "value12"); + + UNIT_ASSERT_VALUES_EQUAL(readRangeResult.pair(0).storage_channel(), 1); + UNIT_ASSERT_VALUES_EQUAL(readRangeResult.pair(1).storage_channel(), 2); + }); + } + + + Y_UNIT_TEST(SimpleWriteListRange) { + TString tablePath = "/Root/mydb/kvtable"; + MakeSimpleTest(tablePath, [tablePath](const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> &stub){ + Write(tablePath, 0, "key1", "value1", 1, stub); + Write(tablePath, 0, "key2", "value12", 2, stub); + + Ydb::KeyValue::ListRangeRequest listRangeRequest; + listRangeRequest.set_path(tablePath); + listRangeRequest.set_partition_id(0); + auto *r = listRangeRequest.mutable_range(); + r->set_from_key_inclusive("key1"); + r->set_to_key_inclusive("key3"); + Ydb::KeyValue::ListRangeResponse listRangeResponse; + Ydb::KeyValue::ListRangeResult listRangeResult; + + grpc::ClientContext listRangeCtx; + AdjustCtxForDB(listRangeCtx); + stub->ListRange(&listRangeCtx, listRangeRequest, &listRangeResponse); + UNIT_ASSERT_CHECK_STATUS(listRangeResponse.operation(), Ydb::StatusIds::SUCCESS); + listRangeResponse.operation().result().UnpackTo(&listRangeResult); + + UNIT_ASSERT_VALUES_EQUAL(listRangeResult.key(0).key(), "key1"); + UNIT_ASSERT_VALUES_EQUAL(listRangeResult.key(1).key(), "key2"); + + UNIT_ASSERT_VALUES_EQUAL(listRangeResult.key(0).value_size(), 6); + UNIT_ASSERT_VALUES_EQUAL(listRangeResult.key(1).value_size(), 7); + + UNIT_ASSERT_VALUES_EQUAL(listRangeResult.key(0).storage_channel(), 1); + UNIT_ASSERT_VALUES_EQUAL(listRangeResult.key(1).storage_channel(), 2); + }); + } + + + Y_UNIT_TEST(SimpleGetStorageChannelStatus) { + TString tablePath = "/Root/mydb/kvtable"; + MakeSimpleTest(tablePath, [tablePath](const std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> &stub){ + Ydb::KeyValue::GetStorageChannelStatusRequest getStatusRequest; + getStatusRequest.set_path(tablePath); + getStatusRequest.set_partition_id(0); + getStatusRequest.add_storage_channel(1); + getStatusRequest.add_storage_channel(2); + getStatusRequest.add_storage_channel(3); + Ydb::KeyValue::GetStorageChannelStatusResponse getStatusResponse; + Ydb::KeyValue::GetStorageChannelStatusResult getStatusResult; + + grpc::ClientContext getStatusCtx; + AdjustCtxForDB(getStatusCtx); + stub->GetStorageChannelStatus(&getStatusCtx, getStatusRequest, &getStatusResponse); + UNIT_ASSERT_CHECK_STATUS(getStatusResponse.operation(), Ydb::StatusIds::SUCCESS); + getStatusResponse.operation().result().UnpackTo(&getStatusResult); + UNIT_ASSERT_VALUES_EQUAL(getStatusResult.storage_channel_info_size(), 3); + }); + } + + Y_UNIT_TEST(SimpleCreateAlterDropVolume) { + TKikimrWithGrpcAndRootSchema server; + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + + std::shared_ptr<grpc::Channel> channel; + channel = grpc::CreateChannel("localhost:" + ToString(grpc), grpc::InsecureChannelCredentials()); + + TString path = "/Root/mydb/"; + TString tablePath = "/Root/mydb/mytable"; + MakeDirectory(channel, path); + MakeTable(channel, tablePath); + + Ydb::Scheme::ListDirectoryResult listDirectoryResult = ListDirectory(channel, path); + UNIT_ASSERT_VALUES_EQUAL(listDirectoryResult.self().name(), "mydb"); + UNIT_ASSERT_VALUES_EQUAL(listDirectoryResult.children(0).name(), "mytable"); + + UNIT_ASSERT_VALUES_EQUAL(1, DescribeVolume(channel, tablePath).partition_count()); + + AlterVolume(channel, tablePath, 2); + listDirectoryResult = ListDirectory(channel, path); + UNIT_ASSERT_VALUES_EQUAL(listDirectoryResult.self().name(), "mydb"); + UNIT_ASSERT_VALUES_EQUAL(listDirectoryResult.children(0).name(), "mytable"); + + + UNIT_ASSERT_VALUES_EQUAL(2, DescribeVolume(channel, tablePath).partition_count()); + + DropVolume(channel, tablePath); + listDirectoryResult = ListDirectory(channel, path); + UNIT_ASSERT_VALUES_EQUAL(listDirectoryResult.self().name(), "mydb"); + UNIT_ASSERT_VALUES_EQUAL(listDirectoryResult.children_size(), 0); + } + + Y_UNIT_TEST(SimpleListPartitions) { + return; // delete it after adding ydb_token to requests in tests + TKikimrWithGrpcAndRootSchema server; + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + + std::shared_ptr<grpc::Channel> channel; + channel = grpc::CreateChannel("localhost:" + ToString(grpc), grpc::InsecureChannelCredentials()); + + TString path = "/Root/mydb/"; + TString tablePath = "/Root/mydb/mytable"; + MakeDirectory(channel, path); + MakeTable(channel, tablePath); + + std::unique_ptr<Ydb::KeyValue::V1::KeyValueService::Stub> stub; + stub = Ydb::KeyValue::V1::KeyValueService::NewStub(channel); + + Write(tablePath, 0, "key1", "value1", 1, stub); + + Ydb::KeyValue::ListLocalPartitionsRequest enumerateRequest; + enumerateRequest.set_path(tablePath); + enumerateRequest.set_node_id(2); + + Ydb::KeyValue::ListLocalPartitionsResult enumerateResult; + Ydb::KeyValue::ListLocalPartitionsResponse eumerateResponse; + + grpc::ClientContext enumerateCtx; + AdjustCtxForDB(enumerateCtx); + stub->ListLocalPartitions(&enumerateCtx, enumerateRequest, &eumerateResponse); + + UNIT_ASSERT_CHECK_STATUS(eumerateResponse.operation(), Ydb::StatusIds::SUCCESS); + eumerateResponse.operation().result().UnpackTo(&enumerateResult); + UNIT_ASSERT_VALUES_EQUAL(enumerateResult.partition_ids_size(), 1); + + auto writeRes = Write(tablePath, enumerateResult.partition_ids(0), "key2", "value2", 1, stub); + UNIT_ASSERT_VALUES_EQUAL(writeRes.node_id(), 2); + } + +} // Y_UNIT_TEST_SUITE(KeyValueGRPCService) + +} // NKikimr::NGRpcService diff --git a/ydb/services/keyvalue/ut/CMakeLists.darwin-x86_64.txt b/ydb/services/keyvalue/ut/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..2fbf1f4e2d --- /dev/null +++ b/ydb/services/keyvalue/ut/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,80 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-services-keyvalue-ut) +target_compile_options(ydb-services-keyvalue-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-services-keyvalue-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/keyvalue +) +target_link_libraries(ydb-services-keyvalue-ut PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + ydb-services-keyvalue + library-cpp-logger + ydb-core-protos + core-testlib-default +) +target_link_options(ydb-services-keyvalue-ut PRIVATE + -Wl,-platform_version,macos,11.0,11.0 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(ydb-services-keyvalue-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/keyvalue/grpc_service_ut.cpp +) +set_property( + TARGET + ydb-services-keyvalue-ut + PROPERTY + SPLIT_FACTOR + 1 +) +add_yunittest( + NAME + ydb-services-keyvalue-ut + TEST_TARGET + ydb-services-keyvalue-ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-services-keyvalue-ut + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-services-keyvalue-ut + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-services-keyvalue-ut + PROPERTY + TIMEOUT + 100 +) +target_allocator(ydb-services-keyvalue-ut + system_allocator +) +vcs_info(ydb-services-keyvalue-ut) diff --git a/ydb/services/keyvalue/ut/CMakeLists.linux-aarch64.txt b/ydb/services/keyvalue/ut/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..0d0ad06718 --- /dev/null +++ b/ydb/services/keyvalue/ut/CMakeLists.linux-aarch64.txt @@ -0,0 +1,83 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-services-keyvalue-ut) +target_compile_options(ydb-services-keyvalue-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-services-keyvalue-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/keyvalue +) +target_link_libraries(ydb-services-keyvalue-ut PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-testing-unittest_main + ydb-services-keyvalue + library-cpp-logger + ydb-core-protos + core-testlib-default +) +target_link_options(ydb-services-keyvalue-ut PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-services-keyvalue-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/keyvalue/grpc_service_ut.cpp +) +set_property( + TARGET + ydb-services-keyvalue-ut + PROPERTY + SPLIT_FACTOR + 1 +) +add_yunittest( + NAME + ydb-services-keyvalue-ut + TEST_TARGET + ydb-services-keyvalue-ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-services-keyvalue-ut + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-services-keyvalue-ut + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-services-keyvalue-ut + PROPERTY + TIMEOUT + 100 +) +target_allocator(ydb-services-keyvalue-ut + cpp-malloc-jemalloc +) +vcs_info(ydb-services-keyvalue-ut) diff --git a/ydb/services/keyvalue/ut/CMakeLists.linux-x86_64.txt b/ydb/services/keyvalue/ut/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..baa6125f02 --- /dev/null +++ b/ydb/services/keyvalue/ut/CMakeLists.linux-x86_64.txt @@ -0,0 +1,85 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-services-keyvalue-ut) +target_compile_options(ydb-services-keyvalue-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-services-keyvalue-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/keyvalue +) +target_link_libraries(ydb-services-keyvalue-ut PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + ydb-services-keyvalue + library-cpp-logger + ydb-core-protos + core-testlib-default +) +target_link_options(ydb-services-keyvalue-ut PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-services-keyvalue-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/keyvalue/grpc_service_ut.cpp +) +set_property( + TARGET + ydb-services-keyvalue-ut + PROPERTY + SPLIT_FACTOR + 1 +) +add_yunittest( + NAME + ydb-services-keyvalue-ut + TEST_TARGET + ydb-services-keyvalue-ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-services-keyvalue-ut + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-services-keyvalue-ut + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-services-keyvalue-ut + PROPERTY + TIMEOUT + 100 +) +target_allocator(ydb-services-keyvalue-ut + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache +) +vcs_info(ydb-services-keyvalue-ut) diff --git a/ydb/services/keyvalue/ut/CMakeLists.txt b/ydb/services/keyvalue/ut/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/services/keyvalue/ut/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/services/keyvalue/ut/CMakeLists.windows-x86_64.txt b/ydb/services/keyvalue/ut/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..6bfcb33387 --- /dev/null +++ b/ydb/services/keyvalue/ut/CMakeLists.windows-x86_64.txt @@ -0,0 +1,73 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-services-keyvalue-ut) +target_compile_options(ydb-services-keyvalue-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-services-keyvalue-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/keyvalue +) +target_link_libraries(ydb-services-keyvalue-ut PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + ydb-services-keyvalue + library-cpp-logger + ydb-core-protos + core-testlib-default +) +target_sources(ydb-services-keyvalue-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/keyvalue/grpc_service_ut.cpp +) +set_property( + TARGET + ydb-services-keyvalue-ut + PROPERTY + SPLIT_FACTOR + 1 +) +add_yunittest( + NAME + ydb-services-keyvalue-ut + TEST_TARGET + ydb-services-keyvalue-ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-services-keyvalue-ut + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-services-keyvalue-ut + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-services-keyvalue-ut + PROPERTY + TIMEOUT + 100 +) +target_allocator(ydb-services-keyvalue-ut + system_allocator +) +vcs_info(ydb-services-keyvalue-ut) diff --git a/ydb/services/keyvalue/ut/ya.make b/ydb/services/keyvalue/ut/ya.make new file mode 100644 index 0000000000..22227dc72a --- /dev/null +++ b/ydb/services/keyvalue/ut/ya.make @@ -0,0 +1,19 @@ +UNITTEST_FOR(ydb/services/keyvalue) + +SIZE(MEDIUM) + +TIMEOUT(100) + +SRCS( + grpc_service_ut.cpp +) + +PEERDIR( + library/cpp/logger + ydb/core/protos + ydb/core/testlib/default +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/services/keyvalue/ya.make b/ydb/services/keyvalue/ya.make new file mode 100644 index 0000000000..7250b9aef4 --- /dev/null +++ b/ydb/services/keyvalue/ya.make @@ -0,0 +1,20 @@ +LIBRARY() + +SRCS( + grpc_service.cpp +) + +PEERDIR( + ydb/public/api/grpc + library/cpp/grpc/server + ydb/core/grpc_services + ydb/core/grpc_services/base + ydb/core/kesus/tablet + ydb/core/keyvalue +) + +END() + +RECURSE_FOR_TESTS( + ut +) diff --git a/ydb/services/ya.make b/ydb/services/ya.make index a89961e2ba..da97baa73c 100644 --- a/ydb/services/ya.make +++ b/ydb/services/ya.make @@ -7,6 +7,7 @@ RECURSE( discovery fq kesus + keyvalue lib local_discovery maintenance |