diff options
author | mokhotskii <[email protected]> | 2022-03-31 10:58:45 +0300 |
---|---|---|
committer | mokhotskii <[email protected]> | 2022-03-31 10:58:45 +0300 |
commit | 6e3564f0fc6d9329f2f36fd10479488bc3876645 (patch) | |
tree | eb02b68a2958e11d150e3a3f283cfbcb3eda5c13 | |
parent | a94418dd6eb3881482c83a06e5477fb6be2c8571 (diff) |
CLOUD-95800 Add list of non chargeable users for YDS
Add list of non chargeable users in YDS PQConfig
ref:8c88b367968ae05c8b4bcc177b943489ab593efd
-rw-r--r-- | ydb/core/protos/pqconfig.proto | 2 | ||||
-rw-r--r-- | ydb/services/datastreams/datastreams_ut.cpp | 83 | ||||
-rw-r--r-- | ydb/services/datastreams/put_records_actor.h | 55 |
3 files changed, 112 insertions, 28 deletions
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index e0343e21409..4c878a48f69 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -142,6 +142,8 @@ message TPQConfig { optional uint32 MetaCacheRefreshIntervalMilliSeconds = 41 [default = 10000]; optional bool MetaCacheSkipVersionCheck = 42 [default = false]; + + repeated string NonChargeableUser = 44; } message TChannelProfile { diff --git a/ydb/services/datastreams/datastreams_ut.cpp b/ydb/services/datastreams/datastreams_ut.cpp index 27847e1aceb..d1ccb6f619a 100644 --- a/ydb/services/datastreams/datastreams_ut.cpp +++ b/ydb/services/datastreams/datastreams_ut.cpp @@ -28,6 +28,9 @@ struct WithSslAndAuth : TKikimrTestSettings { }; using TKikimrWithGrpcAndRootSchemaSecure = NYdb::TBasicKikimrWithGrpcAndRootSchema<WithSslAndAuth>; +static constexpr const char NON_CHARGEABLE_USER[] = "superuser@bultin"; +static constexpr const char NON_CHARGEABLE_USER_X[] = "superuser_x@bultin"; +static constexpr const char NON_CHARGEABLE_USER_Y[] = "superuser_y@bultin"; template<class TKikimr, bool secure> class TDatastreamsTestServer { @@ -43,6 +46,9 @@ public: appConfig.MutablePQConfig()->MutableBillingMeteringConfig()->SetEnabled(true); appConfig.MutablePQConfig()->MutableBillingMeteringConfig()->SetFlushIntervalSec(1); appConfig.MutablePQConfig()->AddClientServiceType()->SetName("data-streams"); + appConfig.MutablePQConfig()->AddNonChargeableUser(NON_CHARGEABLE_USER); + appConfig.MutablePQConfig()->AddNonChargeableUser(NON_CHARGEABLE_USER_X); + appConfig.MutablePQConfig()->AddNonChargeableUser(NON_CHARGEABLE_USER_Y); MeteringFile = MakeHolder<TTempFileHandle>("meteringData.txt"); appConfig.MutableMeteringConfig()->SetMeteringFilePath(MeteringFile->Name()); @@ -92,7 +98,8 @@ public: using TInsecureDatastreamsTestServer = TDatastreamsTestServer<TKikimrWithGrpcAndRootSchema, false>; using TSecureDatastreamsTestServer = TDatastreamsTestServer<TKikimrWithGrpcAndRootSchemaSecure, true>; -void CheckMeteringFile(TTempFileHandle* meteringFile, const TString& streamPath) { +void CheckMeteringFile(TTempFileHandle* meteringFile, const TString& streamPath, + std::function<void(const NJson::TJsonValue::TMapType& map)> usage_check) { Sleep(TDuration::Seconds(1)); meteringFile->Flush(); meteringFile->Close(); @@ -121,9 +128,7 @@ void CheckMeteringFile(TTempFileHandle* meteringFile, const TString& streamPath) if (!tags.empty()) { UNIT_ASSERT_VALUES_EQUAL(tags.size(), 3); } - UNIT_ASSERT(map.contains("usage")); - auto& usage = map.find("usage")->second.GetMap(); - UNIT_ASSERT(usage.find("quantity")->second.GetInteger() >= 0); + usage_check(map); } UNIT_ASSERT(totalLines >= 2); } @@ -131,6 +136,14 @@ void CheckMeteringFile(TTempFileHandle* meteringFile, const TString& streamPath) #define Y_UNIT_TEST_NAME this->Name_; + +#define SET_YDS_LOCALS \ + auto& kikimr = testServer.KikimrServer->Server_; \ + Y_UNUSED(kikimr); \ + auto& driver = testServer.Driver; \ + Y_UNUSED(driver); \ + + Y_UNIT_TEST_SUITE(DataStreams) { Y_UNIT_TEST(TestControlPlaneAndMeteringData) { @@ -242,7 +255,43 @@ Y_UNIT_TEST_SUITE(DataStreams) { UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SCHEME_ERROR); } - CheckMeteringFile(testServer.MeteringFile.Get(), "/Root/" + streamName); + CheckMeteringFile(testServer.MeteringFile.Get(), "/Root/" + streamName, + [](const NJson::TJsonValue::TMapType& map) { + UNIT_ASSERT(map.contains("usage")); + auto& usage = map.find("usage")->second.GetMap(); + UNIT_ASSERT(usage.find("quantity")->second.GetInteger() >= 0); + }); + } + + Y_UNIT_TEST(TestNonChargeableUser) { + TInsecureDatastreamsTestServer testServer; + const TString streamName = TStringBuilder() << "stream_" << Y_UNIT_TEST_NAME; + SET_YDS_LOCALS; + NYDS_V1::TDataStreamsClient client(*driver, TCommonClientSettings().AuthToken(NON_CHARGEABLE_USER)); + + { + auto result = client.CreateStream(streamName, + NYDS_V1::TCreateStreamSettings().ShardCount(3)).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + } + + { // for metering purposes + std::vector<NYDS_V1::TDataRecord> records; + for (ui32 i = 1; i <= 30; ++i) { + TString data = Sprintf("%04u", i); + records.push_back({data, data, ""}); + } + auto result = client.PutRecords(streamName, records).ExtractValueSync(); + Cerr << result.GetResult().DebugString() << Endl; + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + } + + Sleep(TDuration::Seconds(1)); + testServer.MeteringFile.Get()->Flush(); + UNIT_ASSERT_VALUES_EQUAL(testServer.MeteringFile.Get()->GetLength(), 0); + testServer.MeteringFile.Get()->Close(); } Y_UNIT_TEST(TestCreateExistingStream) { @@ -516,13 +565,27 @@ Y_UNIT_TEST_SUITE(DataStreams) { } + Y_UNIT_TEST(TestPutRecordsOfAnauthorizedUser) { + TInsecureDatastreamsTestServer testServer; + const TString streamName = TStringBuilder() << "stream_" << Y_UNIT_TEST_NAME; + SET_YDS_LOCALS; + { + auto result = testServer.DataStreamsClient->CreateStream(streamName, + NYDS_V1::TCreateStreamSettings().ShardCount(5)).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + } + + NYDS_V1::TDataStreamsClient client(*driver, TCommonClientSettings().AuthToken("")); -#define SET_YDS_LOCALS \ - auto& kikimr = testServer.KikimrServer->Server_; \ - Y_UNUSED(kikimr); \ - auto& driver = testServer.Driver; \ - Y_UNUSED(driver); \ + const TString dataStr = "9876543210"; + auto putRecordResult = client.PutRecord("/Root/" + streamName, {dataStr, dataStr, dataStr}).ExtractValueSync(); + Cerr << putRecordResult.GetResult().DebugString() << Endl; + + UNIT_ASSERT_VALUES_EQUAL(putRecordResult.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL(putRecordResult.GetStatus(), EStatus::SUCCESS); + } Y_UNIT_TEST(TestPutRecordsWithRead) { TInsecureDatastreamsTestServer testServer; diff --git a/ydb/services/datastreams/put_records_actor.h b/ydb/services/datastreams/put_records_actor.h index 81184352c1e..6b5888a18b6 100644 --- a/ydb/services/datastreams/put_records_actor.h +++ b/ydb/services/datastreams/put_records_actor.h @@ -42,12 +42,13 @@ namespace NKikimr::NDataStreams::V1 { public: using TBase = TActorBootstrapped<TDatastreamsPartitionActor>; - TDatastreamsPartitionActor(NActors::TActorId parentId, ui64 tabletId, ui32 partition, const TString& topic, TVector<TPutRecordsItem> dataToWrite) + TDatastreamsPartitionActor(NActors::TActorId parentId, ui64 tabletId, ui32 partition, const TString& topic, TVector<TPutRecordsItem> dataToWrite, bool shouldBeCharged) : ParentId(std::move(parentId)) , TabletId(tabletId) , Partition(partition) , Topic(topic) , DataToWrite(std::move(dataToWrite)) + , ShouldBeCharged(shouldBeCharged) { } @@ -60,7 +61,8 @@ namespace NKikimr::NDataStreams::V1 { .BackoffMultiplier = 2, .DoFirstRetryInstantly = true }; - PipeClient = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, TabletId, clientConfig)); + PipeClient = ctx.RegisterWithSameMailbox( + NTabletPipe::CreateClient(ctx.SelfID, TabletId, clientConfig)); SendWriteRequest(ctx); Become(&TDatastreamsPartitionActor::PartitionWriteFunc); @@ -108,10 +110,13 @@ namespace NKikimr::NDataStreams::V1 { w->SetExternalOperation(true); totalSize += (item.Data.size() + item.Key.size() + item.ExplicitHash.size()); } - ui64 putUnitsCount = totalSize / PUT_UNIT_SIZE; - if (totalSize % PUT_UNIT_SIZE != 0) - putUnitsCount++; - request.MutablePartitionRequest()->SetPutUnitsSize(putUnitsCount); + + if (ShouldBeCharged) { + ui64 putUnitsCount = totalSize / PUT_UNIT_SIZE; + if (totalSize % PUT_UNIT_SIZE != 0) + putUnitsCount++; + request.MutablePartitionRequest()->SetPutUnitsSize(putUnitsCount); + } TAutoPtr<TEvPersQueue::TEvRequest> req(new TEvPersQueue::TEvRequest); req->Record.Swap(&request); @@ -169,6 +174,7 @@ namespace NKikimr::NDataStreams::V1 { TString Topic; TVector<TPutRecordsItem> DataToWrite; NActors::TActorId PipeClient; + bool ShouldBeCharged; }; //------------------------------------------------------------------------------------ @@ -231,12 +237,12 @@ namespace NKikimr::NDataStreams::V1 { Ydb::DataStreams::V1::PutRecordsResult PutRecordsResult; TString Ip; + bool ShouldBeCharged; void SendNavigateRequest(const TActorContext &ctx); - - void Handle(NDataStreams::V1::TEvDataStreams::TEvPartitionActorResult::TPtr& ev, const TActorContext& ctx); + void Handle(NDataStreams::V1::TEvDataStreams::TEvPartitionActorResult::TPtr& ev, + const TActorContext& ctx); void Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx); - void CheckFinish(const TActorContext& ctx); STFUNC(StateFunc) { @@ -261,15 +267,26 @@ namespace NKikimr::NDataStreams::V1 { TString error = CheckRequestIsValid(static_cast<TDerived*>(this)->GetPutRecordsRequest()); if (!error.empty()) { - return this->ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST, error, ctx); + return this->ReplyWithError(Ydb::StatusIds::BAD_REQUEST, + Ydb::PersQueue::ErrorCode::BAD_REQUEST, + error, ctx); } if (this->Request_->GetInternalToken().empty()) { if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) { - return this->ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, Ydb::PersQueue::ErrorCode::ACCESS_DENIED, - "Request without authentication are not allowed", ctx); + return this->ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, + Ydb::PersQueue::ErrorCode::ACCESS_DENIED, + TStringBuilder() << "Access to stream " + << this->GetProtoRequest()->stream_name() + << " is denied", ctx); } } + NACLib::TUserToken token(this->Request_->GetInternalToken()); + + ShouldBeCharged = std::find( + AppData(ctx)->PQConfig.GetNonChargeableUser().begin(), + AppData(ctx)->PQConfig.GetNonChargeableUser().end(), + token.GetUserSID()) != AppData(ctx)->PQConfig.GetNonChargeableUser().end(); SendNavigateRequest(ctx); this->Become(&TPutRecordsActorBase<TDerived, TProto>::StateFunc); @@ -295,13 +312,14 @@ namespace NKikimr::NDataStreams::V1 { const NSchemeCache::TSchemeCacheNavigate* navigate = ev->Get()->Request.Get(); auto topicInfo = navigate->ResultSet.begin(); if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) { - if (!topicInfo->SecurityObject->CheckAccess(NACLib::EAccessRights::UpdateRow, - this->Request_->GetInternalToken())) { - return this->ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, Ydb::PersQueue::ErrorCode::ACCESS_DENIED, + NACLib::TUserToken token(this->Request_->GetInternalToken()); + if (!topicInfo->SecurityObject->CheckAccess(NACLib::EAccessRights::UpdateRow, token)) { + return this->ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, + Ydb::PersQueue::ErrorCode::ACCESS_DENIED, TStringBuilder() << "Access for stream " << this->GetProtoRequest()->stream_name() << " is denied for subject " - << this->Request_->GetInternalToken(), ctx); + << token.GetUserSID(), ctx); } } @@ -317,8 +335,9 @@ namespace NKikimr::NDataStreams::V1 { auto part = partition.GetPartitionId(); if (items[part].empty()) continue; PartitionToActor[part].ActorId = ctx.Register( - new TDatastreamsPartitionActor(ctx.SelfID, partition.GetTabletId(), part, this->GetTopicPath(ctx), std::move(items[part])) - ); + new TDatastreamsPartitionActor(ctx.SelfID, partition.GetTabletId(), part, + this->GetTopicPath(ctx), std::move(items[part]), + ShouldBeCharged)); } this->CheckFinish(ctx); } |