diff options
author | mokhotskii <mokhotskii@yandex-team.ru> | 2022-06-06 14:52:32 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-06-06 14:52:32 +0300 |
commit | 9fb7cf183a555e64fbdf203a26dcd4a973ad383c (patch) | |
tree | 49b684d67a51e490dd683089d93ea80e59ddd6d1 | |
parent | f69185e2f55ceae1a27ca243e3fe240489c3e3cb (diff) | |
download | ydb-9fb7cf183a555e64fbdf203a26dcd4a973ad383c.tar.gz |
[merge to 22-2] Fix defining if user should be charged in YDS for put units
CLOUD_95800 Fix defining if user should be charged in YDS for put units
Fix defining if user should be charged in YDS for put units
REVIEW: 2522665
REVIEW: 2523795
x-ydb-stable-ref: 7e24a3cdaf14fe296fc683936bc0c0b53189da1d
-rw-r--r-- | ydb/services/datastreams/datastreams_ut.cpp | 145 | ||||
-rw-r--r-- | ydb/services/datastreams/put_records_actor.h | 2 |
2 files changed, 119 insertions, 28 deletions
diff --git a/ydb/services/datastreams/datastreams_ut.cpp b/ydb/services/datastreams/datastreams_ut.cpp index 64f5705ccd..8a788225ff 100644 --- a/ydb/services/datastreams/datastreams_ut.cpp +++ b/ydb/services/datastreams/datastreams_ut.cpp @@ -28,9 +28,9 @@ struct WithSslAndAuth : TKikimrTestSettings { }; using TKikimrWithGrpcAndRootSchemaSecure = NYdb::TBasicKikimrWithGrpcAndRootSchema<WithSslAndAuth>; -static constexpr const char NON_CHARGEABLE_USER[] = "superuser@bultin"; -static constexpr const char NON_CHARGEABLE_USER_X[] = "superuser_x@bultin"; -static constexpr const char NON_CHARGEABLE_USER_Y[] = "superuser_y@bultin"; +static constexpr const char NON_CHARGEABLE_USER[] = "superuser@builtin"; +static constexpr const char NON_CHARGEABLE_USER_X[] = "superuser_x@builtin"; +static constexpr const char NON_CHARGEABLE_USER_Y[] = "superuser_y@builtin"; template<class TKikimr, bool secure> class TDatastreamsTestServer { @@ -98,20 +98,31 @@ public: using TInsecureDatastreamsTestServer = TDatastreamsTestServer<TKikimrWithGrpcAndRootSchema, false>; using TSecureDatastreamsTestServer = TDatastreamsTestServer<TKikimrWithGrpcAndRootSchemaSecure, true>; -void CheckMeteringFile(TTempFileHandle* meteringFile, const TString& streamPath, +bool CheckMeteringFile(TTempFileHandle* meteringFile, const TString& streamPath, const TString& schema, + std::function<void(const NJson::TJsonValue::TMapType& map)> tags_check, + std::function<void(const NJson::TJsonValue::TMapType& map)> labels_check, std::function<void(const NJson::TJsonValue::TMapType& map)> usage_check) { Sleep(TDuration::Seconds(1)); - meteringFile->Flush(); - meteringFile->Close(); + if (meteringFile->IsOpen()) { + meteringFile->Flush(); + meteringFile->Close(); + } auto input = TFileInput(TFile(meteringFile->Name(), RdOnly | OpenExisting)); ui64 totalLines = 0; TString line; + bool schemaFound = false; while(input.ReadLine(line)) { totalLines++; Cerr << "Got line from metering file data: '" << line << "'" << Endl; NJson::TJsonValue json; NJson::ReadJsonTree(line, &json, true); auto& map = json.GetMap(); + UNIT_ASSERT(map.contains("schema")); + if (map.find("schema")->second.GetString() == schema) { + schemaFound = true; + } else { + continue; + } UNIT_ASSERT(map.contains("cloud_id")); UNIT_ASSERT(map.contains("folder_id")); UNIT_ASSERT(map.contains("resource_id")); @@ -124,13 +135,11 @@ void CheckMeteringFile(TTempFileHandle* meteringFile, const TString& streamPath, UNIT_ASSERT(map.find("cloud_id")->second.GetString() == "somecloud"); UNIT_ASSERT(map.find("folder_id")->second.GetString() == "somefolder"); UNIT_ASSERT(map.find("resource_id")->second.GetString() == streamPath); - auto& tags = map.find("tags")->second.GetMap(); - if (!tags.empty()) { - UNIT_ASSERT_VALUES_EQUAL(tags.size(), 3); - } + tags_check(map); + labels_check(map); usage_check(map); } - UNIT_ASSERT(totalLines >= 2); + return schemaFound; } @@ -241,7 +250,6 @@ Y_UNIT_TEST_SUITE(DataStreams) { UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); } - // now when stream is created delete should work fine { auto result = testServer.DataStreamsClient->DeleteStream(streamName).ExtractValueSync(); @@ -255,25 +263,76 @@ Y_UNIT_TEST_SUITE(DataStreams) { UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SCHEME_ERROR); } - CheckMeteringFile(testServer.MeteringFile.Get(), "/Root/" + streamName, + auto putUnitsSchemaFound = + CheckMeteringFile(testServer.MeteringFile.Get(), "/Root/" + streamName, "yds.events.puts.v1", + [](const NJson::TJsonValue::TMapType& map) { + UNIT_ASSERT(map.contains("tags")); + auto& tags = map.find("tags")->second.GetMap(); + UNIT_ASSERT_VALUES_EQUAL(tags.size(), 0); + }, + [streamName](const NJson::TJsonValue::TMapType& map) { + UNIT_ASSERT(map.contains("labels")); + auto& labels = map.find("labels")->second.GetMap(); + UNIT_ASSERT_VALUES_EQUAL( + labels.find("datastreams_stream_name")->second.GetString(), streamName); + UNIT_ASSERT_VALUES_EQUAL( + labels.find("ydb_database")->second.GetString(), "root"); + }, + [](const NJson::TJsonValue::TMapType& map) { + UNIT_ASSERT(map.contains("usage")); + auto& usage = map.find("usage")->second.GetMap(); + UNIT_ASSERT(usage.find("quantity")->second.GetInteger() >= 0); + UNIT_ASSERT_VALUES_EQUAL(usage.find("unit")->second.GetString(), "put_events"); + }); + UNIT_ASSERT_VALUES_EQUAL(putUnitsSchemaFound, true); + + auto resourcesReservedSchemaFound = + CheckMeteringFile(testServer.MeteringFile.Get(), "/Root/" + streamName, "yds.resources.reserved.v1", + [](const NJson::TJsonValue::TMapType& map) { + UNIT_ASSERT(map.contains("tags")); + auto& tags = map.find("tags")->second.GetMap(); + UNIT_ASSERT(tags.contains("reserved_throughput_bps")); + UNIT_ASSERT(tags.contains("shard_enhanced_consumers_throughput")); + UNIT_ASSERT(tags.contains("reserved_storage_bytes")); + }, + [streamName](const NJson::TJsonValue::TMapType& map) { + UNIT_ASSERT(map.contains("labels")); + auto& labels = map.find("labels")->second.GetMap(); + UNIT_ASSERT_VALUES_EQUAL( + labels.find("datastreams_stream_name")->second.GetString(), streamName); + UNIT_ASSERT_VALUES_EQUAL( + labels.find("ydb_database")->second.GetString(), "root"); + }, [](const NJson::TJsonValue::TMapType& map) { UNIT_ASSERT(map.contains("usage")); auto& usage = map.find("usage")->second.GetMap(); UNIT_ASSERT(usage.find("quantity")->second.GetInteger() >= 0); + UNIT_ASSERT_VALUES_EQUAL(usage.find("unit")->second.GetString(), "second"); }); + UNIT_ASSERT_VALUES_EQUAL(resourcesReservedSchemaFound, true); } Y_UNIT_TEST(TestNonChargeableUser) { - TInsecureDatastreamsTestServer testServer; + TSecureDatastreamsTestServer testServer; const TString streamName = TStringBuilder() << "stream_" << Y_UNIT_TEST_NAME; SET_YDS_LOCALS; - NYDS_V1::TDataStreamsClient client(*driver, TCommonClientSettings().AuthToken(NON_CHARGEABLE_USER)); - + const TString streamPath = "/Root/" + streamName; { - auto result = client.CreateStream(streamName, - NYDS_V1::TCreateStreamSettings().ShardCount(3)).ExtractValueSync(); + auto result = testServer.DataStreamsClient->CreateStream(streamPath, + NYDS_V1::TCreateStreamSettings().ShardCount(1)).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + NYDS_V1::TDataStreamsClient client(*driver, TCommonClientSettings().AuthToken(NON_CHARGEABLE_USER)); + NYdb::NScheme::TSchemeClient schemeClient(*driver); + { + NYdb::NScheme::TPermissions permissions(NON_CHARGEABLE_USER, + {"ydb.generic.read", "ydb.generic.write"}); + auto result = schemeClient.ModifyPermissions(streamPath, + NYdb::NScheme::TModifyPermissionsSettings().AddGrantPermissions(permissions) + ).ExtractValueSync(); + UNIT_ASSERT(result.IsSuccess()); } { // for metering purposes @@ -282,16 +341,49 @@ Y_UNIT_TEST_SUITE(DataStreams) { TString data = Sprintf("%04u", i); records.push_back({data, data, ""}); } - auto result = client.PutRecords(streamName, records).ExtractValueSync(); + auto result = client.PutRecords(streamPath, records).ExtractValueSync(); Cerr << result.GetResult().DebugString() << Endl; UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); } - Sleep(TDuration::Seconds(1)); - testServer.MeteringFile.Get()->Flush(); - UNIT_ASSERT_VALUES_EQUAL(testServer.MeteringFile.Get()->GetLength(), 0); - testServer.MeteringFile.Get()->Close(); + { + auto result = testServer.DataStreamsClient->DeleteStream(streamPath).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + } + + auto putUnitsSchemaFound = + CheckMeteringFile(testServer.MeteringFile.Get(), streamPath, "yds.events.puts.v1", + [](const NJson::TJsonValue::TMapType&) {}, + [](const NJson::TJsonValue::TMapType&) {}, + [](const NJson::TJsonValue::TMapType&) {}); + UNIT_ASSERT_VALUES_EQUAL(putUnitsSchemaFound, false); + + auto resourcesReservedSchemaFound = + CheckMeteringFile(testServer.MeteringFile.Get(), streamPath, "yds.resources.reserved.v1", + [](const NJson::TJsonValue::TMapType& map) { + UNIT_ASSERT(map.contains("tags")); + auto& tags = map.find("tags")->second.GetMap(); + UNIT_ASSERT(tags.contains("reserved_throughput_bps")); + UNIT_ASSERT(tags.contains("shard_enhanced_consumers_throughput")); + UNIT_ASSERT(tags.contains("reserved_storage_bytes")); + }, + [streamName](const NJson::TJsonValue::TMapType& map) { + UNIT_ASSERT(map.contains("labels")); + auto& labels = map.find("labels")->second.GetMap(); + UNIT_ASSERT_VALUES_EQUAL( + labels.find("datastreams_stream_name")->second.GetString(), streamName); + UNIT_ASSERT_VALUES_EQUAL( + labels.find("ydb_database")->second.GetString(), "root"); + }, + [](const NJson::TJsonValue::TMapType& map) { + UNIT_ASSERT(map.contains("usage")); + auto& usage = map.find("usage")->second.GetMap(); + UNIT_ASSERT(usage.find("quantity")->second.GetInteger() >= 0); + UNIT_ASSERT_VALUES_EQUAL(usage.find("unit")->second.GetString(), "second"); + }); + UNIT_ASSERT_VALUES_EQUAL(resourcesReservedSchemaFound, true); } Y_UNIT_TEST(TestCreateExistingStream) { @@ -580,9 +672,8 @@ Y_UNIT_TEST_SUITE(DataStreams) { const TString dataStr = "9876543210"; - auto putRecordResult = client.PutRecord("/Root/" + streamName, {dataStr, dataStr, dataStr}).ExtractValueSync(); - Cerr << putRecordResult.GetResult().DebugString() << Endl; - + auto putRecordResult = + client.PutRecord("/Root/" + streamName, {dataStr, dataStr, dataStr}).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL(putRecordResult.IsTransportError(), false); UNIT_ASSERT_VALUES_EQUAL(putRecordResult.GetStatus(), EStatus::SUCCESS); } diff --git a/ydb/services/datastreams/put_records_actor.h b/ydb/services/datastreams/put_records_actor.h index 8923dce1c6..c43c6f3024 100644 --- a/ydb/services/datastreams/put_records_actor.h +++ b/ydb/services/datastreams/put_records_actor.h @@ -284,7 +284,7 @@ namespace NKikimr::NDataStreams::V1 { ShouldBeCharged = std::find( AppData(ctx)->PQConfig.GetNonChargeableUser().begin(), AppData(ctx)->PQConfig.GetNonChargeableUser().end(), - token.GetUserSID()) != AppData(ctx)->PQConfig.GetNonChargeableUser().end(); + token.GetUserSID()) == AppData(ctx)->PQConfig.GetNonChargeableUser().end(); SendNavigateRequest(ctx); this->Become(&TPutRecordsActorBase<TDerived, TProto>::StateFunc); |