summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormokhotskii <[email protected]>2022-03-31 10:58:45 +0300
committermokhotskii <[email protected]>2022-03-31 10:58:45 +0300
commit6e3564f0fc6d9329f2f36fd10479488bc3876645 (patch)
treeeb02b68a2958e11d150e3a3f283cfbcb3eda5c13
parenta94418dd6eb3881482c83a06e5477fb6be2c8571 (diff)
CLOUD-95800 Add list of non chargeable users for YDS
Add list of non chargeable users in YDS PQConfig ref:8c88b367968ae05c8b4bcc177b943489ab593efd
-rw-r--r--ydb/core/protos/pqconfig.proto2
-rw-r--r--ydb/services/datastreams/datastreams_ut.cpp83
-rw-r--r--ydb/services/datastreams/put_records_actor.h55
3 files changed, 112 insertions, 28 deletions
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index e0343e21409..4c878a48f69 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -142,6 +142,8 @@ message TPQConfig {
optional uint32 MetaCacheRefreshIntervalMilliSeconds = 41 [default = 10000];
optional bool MetaCacheSkipVersionCheck = 42 [default = false];
+
+ repeated string NonChargeableUser = 44;
}
message TChannelProfile {
diff --git a/ydb/services/datastreams/datastreams_ut.cpp b/ydb/services/datastreams/datastreams_ut.cpp
index 27847e1aceb..d1ccb6f619a 100644
--- a/ydb/services/datastreams/datastreams_ut.cpp
+++ b/ydb/services/datastreams/datastreams_ut.cpp
@@ -28,6 +28,9 @@ struct WithSslAndAuth : TKikimrTestSettings {
};
using TKikimrWithGrpcAndRootSchemaSecure = NYdb::TBasicKikimrWithGrpcAndRootSchema<WithSslAndAuth>;
+static constexpr const char NON_CHARGEABLE_USER[] = "superuser@bultin";
+static constexpr const char NON_CHARGEABLE_USER_X[] = "superuser_x@bultin";
+static constexpr const char NON_CHARGEABLE_USER_Y[] = "superuser_y@bultin";
template<class TKikimr, bool secure>
class TDatastreamsTestServer {
@@ -43,6 +46,9 @@ public:
appConfig.MutablePQConfig()->MutableBillingMeteringConfig()->SetEnabled(true);
appConfig.MutablePQConfig()->MutableBillingMeteringConfig()->SetFlushIntervalSec(1);
appConfig.MutablePQConfig()->AddClientServiceType()->SetName("data-streams");
+ appConfig.MutablePQConfig()->AddNonChargeableUser(NON_CHARGEABLE_USER);
+ appConfig.MutablePQConfig()->AddNonChargeableUser(NON_CHARGEABLE_USER_X);
+ appConfig.MutablePQConfig()->AddNonChargeableUser(NON_CHARGEABLE_USER_Y);
MeteringFile = MakeHolder<TTempFileHandle>("meteringData.txt");
appConfig.MutableMeteringConfig()->SetMeteringFilePath(MeteringFile->Name());
@@ -92,7 +98,8 @@ public:
using TInsecureDatastreamsTestServer = TDatastreamsTestServer<TKikimrWithGrpcAndRootSchema, false>;
using TSecureDatastreamsTestServer = TDatastreamsTestServer<TKikimrWithGrpcAndRootSchemaSecure, true>;
-void CheckMeteringFile(TTempFileHandle* meteringFile, const TString& streamPath) {
+void CheckMeteringFile(TTempFileHandle* meteringFile, const TString& streamPath,
+ std::function<void(const NJson::TJsonValue::TMapType& map)> usage_check) {
Sleep(TDuration::Seconds(1));
meteringFile->Flush();
meteringFile->Close();
@@ -121,9 +128,7 @@ void CheckMeteringFile(TTempFileHandle* meteringFile, const TString& streamPath)
if (!tags.empty()) {
UNIT_ASSERT_VALUES_EQUAL(tags.size(), 3);
}
- UNIT_ASSERT(map.contains("usage"));
- auto& usage = map.find("usage")->second.GetMap();
- UNIT_ASSERT(usage.find("quantity")->second.GetInteger() >= 0);
+ usage_check(map);
}
UNIT_ASSERT(totalLines >= 2);
}
@@ -131,6 +136,14 @@ void CheckMeteringFile(TTempFileHandle* meteringFile, const TString& streamPath)
#define Y_UNIT_TEST_NAME this->Name_;
+
+#define SET_YDS_LOCALS \
+ auto& kikimr = testServer.KikimrServer->Server_; \
+ Y_UNUSED(kikimr); \
+ auto& driver = testServer.Driver; \
+ Y_UNUSED(driver); \
+
+
Y_UNIT_TEST_SUITE(DataStreams) {
Y_UNIT_TEST(TestControlPlaneAndMeteringData) {
@@ -242,7 +255,43 @@ Y_UNIT_TEST_SUITE(DataStreams) {
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SCHEME_ERROR);
}
- CheckMeteringFile(testServer.MeteringFile.Get(), "/Root/" + streamName);
+ CheckMeteringFile(testServer.MeteringFile.Get(), "/Root/" + streamName,
+ [](const NJson::TJsonValue::TMapType& map) {
+ UNIT_ASSERT(map.contains("usage"));
+ auto& usage = map.find("usage")->second.GetMap();
+ UNIT_ASSERT(usage.find("quantity")->second.GetInteger() >= 0);
+ });
+ }
+
+ Y_UNIT_TEST(TestNonChargeableUser) {
+ TInsecureDatastreamsTestServer testServer;
+ const TString streamName = TStringBuilder() << "stream_" << Y_UNIT_TEST_NAME;
+ SET_YDS_LOCALS;
+ NYDS_V1::TDataStreamsClient client(*driver, TCommonClientSettings().AuthToken(NON_CHARGEABLE_USER));
+
+ {
+ auto result = client.CreateStream(streamName,
+ NYDS_V1::TCreateStreamSettings().ShardCount(3)).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+ }
+
+ { // for metering purposes
+ std::vector<NYDS_V1::TDataRecord> records;
+ for (ui32 i = 1; i <= 30; ++i) {
+ TString data = Sprintf("%04u", i);
+ records.push_back({data, data, ""});
+ }
+ auto result = client.PutRecords(streamName, records).ExtractValueSync();
+ Cerr << result.GetResult().DebugString() << Endl;
+ UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+ }
+
+ Sleep(TDuration::Seconds(1));
+ testServer.MeteringFile.Get()->Flush();
+ UNIT_ASSERT_VALUES_EQUAL(testServer.MeteringFile.Get()->GetLength(), 0);
+ testServer.MeteringFile.Get()->Close();
}
Y_UNIT_TEST(TestCreateExistingStream) {
@@ -516,13 +565,27 @@ Y_UNIT_TEST_SUITE(DataStreams) {
}
+ Y_UNIT_TEST(TestPutRecordsOfAnauthorizedUser) {
+ TInsecureDatastreamsTestServer testServer;
+ const TString streamName = TStringBuilder() << "stream_" << Y_UNIT_TEST_NAME;
+ SET_YDS_LOCALS;
+ {
+ auto result = testServer.DataStreamsClient->CreateStream(streamName,
+ NYDS_V1::TCreateStreamSettings().ShardCount(5)).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+ }
+
+ NYDS_V1::TDataStreamsClient client(*driver, TCommonClientSettings().AuthToken(""));
-#define SET_YDS_LOCALS \
- auto& kikimr = testServer.KikimrServer->Server_; \
- Y_UNUSED(kikimr); \
- auto& driver = testServer.Driver; \
- Y_UNUSED(driver); \
+ const TString dataStr = "9876543210";
+ auto putRecordResult = client.PutRecord("/Root/" + streamName, {dataStr, dataStr, dataStr}).ExtractValueSync();
+ Cerr << putRecordResult.GetResult().DebugString() << Endl;
+
+ UNIT_ASSERT_VALUES_EQUAL(putRecordResult.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(putRecordResult.GetStatus(), EStatus::SUCCESS);
+ }
Y_UNIT_TEST(TestPutRecordsWithRead) {
TInsecureDatastreamsTestServer testServer;
diff --git a/ydb/services/datastreams/put_records_actor.h b/ydb/services/datastreams/put_records_actor.h
index 81184352c1e..6b5888a18b6 100644
--- a/ydb/services/datastreams/put_records_actor.h
+++ b/ydb/services/datastreams/put_records_actor.h
@@ -42,12 +42,13 @@ namespace NKikimr::NDataStreams::V1 {
public:
using TBase = TActorBootstrapped<TDatastreamsPartitionActor>;
- TDatastreamsPartitionActor(NActors::TActorId parentId, ui64 tabletId, ui32 partition, const TString& topic, TVector<TPutRecordsItem> dataToWrite)
+ TDatastreamsPartitionActor(NActors::TActorId parentId, ui64 tabletId, ui32 partition, const TString& topic, TVector<TPutRecordsItem> dataToWrite, bool shouldBeCharged)
: ParentId(std::move(parentId))
, TabletId(tabletId)
, Partition(partition)
, Topic(topic)
, DataToWrite(std::move(dataToWrite))
+ , ShouldBeCharged(shouldBeCharged)
{
}
@@ -60,7 +61,8 @@ namespace NKikimr::NDataStreams::V1 {
.BackoffMultiplier = 2,
.DoFirstRetryInstantly = true
};
- PipeClient = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, TabletId, clientConfig));
+ PipeClient = ctx.RegisterWithSameMailbox(
+ NTabletPipe::CreateClient(ctx.SelfID, TabletId, clientConfig));
SendWriteRequest(ctx);
Become(&TDatastreamsPartitionActor::PartitionWriteFunc);
@@ -108,10 +110,13 @@ namespace NKikimr::NDataStreams::V1 {
w->SetExternalOperation(true);
totalSize += (item.Data.size() + item.Key.size() + item.ExplicitHash.size());
}
- ui64 putUnitsCount = totalSize / PUT_UNIT_SIZE;
- if (totalSize % PUT_UNIT_SIZE != 0)
- putUnitsCount++;
- request.MutablePartitionRequest()->SetPutUnitsSize(putUnitsCount);
+
+ if (ShouldBeCharged) {
+ ui64 putUnitsCount = totalSize / PUT_UNIT_SIZE;
+ if (totalSize % PUT_UNIT_SIZE != 0)
+ putUnitsCount++;
+ request.MutablePartitionRequest()->SetPutUnitsSize(putUnitsCount);
+ }
TAutoPtr<TEvPersQueue::TEvRequest> req(new TEvPersQueue::TEvRequest);
req->Record.Swap(&request);
@@ -169,6 +174,7 @@ namespace NKikimr::NDataStreams::V1 {
TString Topic;
TVector<TPutRecordsItem> DataToWrite;
NActors::TActorId PipeClient;
+ bool ShouldBeCharged;
};
//------------------------------------------------------------------------------------
@@ -231,12 +237,12 @@ namespace NKikimr::NDataStreams::V1 {
Ydb::DataStreams::V1::PutRecordsResult PutRecordsResult;
TString Ip;
+ bool ShouldBeCharged;
void SendNavigateRequest(const TActorContext &ctx);
-
- void Handle(NDataStreams::V1::TEvDataStreams::TEvPartitionActorResult::TPtr& ev, const TActorContext& ctx);
+ void Handle(NDataStreams::V1::TEvDataStreams::TEvPartitionActorResult::TPtr& ev,
+ const TActorContext& ctx);
void Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx);
-
void CheckFinish(const TActorContext& ctx);
STFUNC(StateFunc) {
@@ -261,15 +267,26 @@ namespace NKikimr::NDataStreams::V1 {
TString error = CheckRequestIsValid(static_cast<TDerived*>(this)->GetPutRecordsRequest());
if (!error.empty()) {
- return this->ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST, error, ctx);
+ return this->ReplyWithError(Ydb::StatusIds::BAD_REQUEST,
+ Ydb::PersQueue::ErrorCode::BAD_REQUEST,
+ error, ctx);
}
if (this->Request_->GetInternalToken().empty()) {
if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) {
- return this->ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, Ydb::PersQueue::ErrorCode::ACCESS_DENIED,
- "Request without authentication are not allowed", ctx);
+ return this->ReplyWithError(Ydb::StatusIds::UNAUTHORIZED,
+ Ydb::PersQueue::ErrorCode::ACCESS_DENIED,
+ TStringBuilder() << "Access to stream "
+ << this->GetProtoRequest()->stream_name()
+ << " is denied", ctx);
}
}
+ NACLib::TUserToken token(this->Request_->GetInternalToken());
+
+ ShouldBeCharged = std::find(
+ AppData(ctx)->PQConfig.GetNonChargeableUser().begin(),
+ AppData(ctx)->PQConfig.GetNonChargeableUser().end(),
+ token.GetUserSID()) != AppData(ctx)->PQConfig.GetNonChargeableUser().end();
SendNavigateRequest(ctx);
this->Become(&TPutRecordsActorBase<TDerived, TProto>::StateFunc);
@@ -295,13 +312,14 @@ namespace NKikimr::NDataStreams::V1 {
const NSchemeCache::TSchemeCacheNavigate* navigate = ev->Get()->Request.Get();
auto topicInfo = navigate->ResultSet.begin();
if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) {
- if (!topicInfo->SecurityObject->CheckAccess(NACLib::EAccessRights::UpdateRow,
- this->Request_->GetInternalToken())) {
- return this->ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, Ydb::PersQueue::ErrorCode::ACCESS_DENIED,
+ NACLib::TUserToken token(this->Request_->GetInternalToken());
+ if (!topicInfo->SecurityObject->CheckAccess(NACLib::EAccessRights::UpdateRow, token)) {
+ return this->ReplyWithError(Ydb::StatusIds::UNAUTHORIZED,
+ Ydb::PersQueue::ErrorCode::ACCESS_DENIED,
TStringBuilder() << "Access for stream "
<< this->GetProtoRequest()->stream_name()
<< " is denied for subject "
- << this->Request_->GetInternalToken(), ctx);
+ << token.GetUserSID(), ctx);
}
}
@@ -317,8 +335,9 @@ namespace NKikimr::NDataStreams::V1 {
auto part = partition.GetPartitionId();
if (items[part].empty()) continue;
PartitionToActor[part].ActorId = ctx.Register(
- new TDatastreamsPartitionActor(ctx.SelfID, partition.GetTabletId(), part, this->GetTopicPath(ctx), std::move(items[part]))
- );
+ new TDatastreamsPartitionActor(ctx.SelfID, partition.GetTabletId(), part,
+ this->GetTopicPath(ctx), std::move(items[part]),
+ ShouldBeCharged));
}
this->CheckFinish(ctx);
}