diff options
author | mokhotskii <mokhotskii@yandex-team.ru> | 2022-05-04 22:50:18 +0300 |
---|---|---|
committer | mokhotskii <mokhotskii@yandex-team.ru> | 2022-05-04 22:50:18 +0300 |
commit | da953649e9d87bcad124940f6c075283b187a1a9 (patch) | |
tree | 765b6ef71c1043783b4c6bac211304080e86246b | |
parent | 4a6a6bef37e1f38862da09706f217cd4a8bb0f38 (diff) | |
download | ydb-da953649e9d87bcad124940f6c075283b187a1a9.tar.gz |
CLOUD_95800 Fix defining if user should be charged in YDS for put units
Fix defining if user should be charged in YDS for put units
ref:6ad615ec48fa4e2cae7ac28c7e41c17080f2c704
-rw-r--r-- | ydb/services/datastreams/datastreams_ut.cpp | 145 | ||||
-rw-r--r-- | ydb/services/datastreams/put_records_actor.h | 2 |
2 files changed, 119 insertions, 28 deletions
diff --git a/ydb/services/datastreams/datastreams_ut.cpp b/ydb/services/datastreams/datastreams_ut.cpp index e56be9e494..70b712929d 100644 --- a/ydb/services/datastreams/datastreams_ut.cpp +++ b/ydb/services/datastreams/datastreams_ut.cpp @@ -28,9 +28,9 @@ struct WithSslAndAuth : TKikimrTestSettings { }; using TKikimrWithGrpcAndRootSchemaSecure = NYdb::TBasicKikimrWithGrpcAndRootSchema<WithSslAndAuth>; -static constexpr const char NON_CHARGEABLE_USER[] = "superuser@bultin"; -static constexpr const char NON_CHARGEABLE_USER_X[] = "superuser_x@bultin"; -static constexpr const char NON_CHARGEABLE_USER_Y[] = "superuser_y@bultin"; +static constexpr const char NON_CHARGEABLE_USER[] = "superuser@builtin"; +static constexpr const char NON_CHARGEABLE_USER_X[] = "superuser_x@builtin"; +static constexpr const char NON_CHARGEABLE_USER_Y[] = "superuser_y@builtin"; template<class TKikimr, bool secure> class TDatastreamsTestServer { @@ -97,20 +97,31 @@ public: using TInsecureDatastreamsTestServer = TDatastreamsTestServer<TKikimrWithGrpcAndRootSchema, false>; using TSecureDatastreamsTestServer = TDatastreamsTestServer<TKikimrWithGrpcAndRootSchemaSecure, true>; -void CheckMeteringFile(TTempFileHandle* meteringFile, const TString& streamPath, +bool CheckMeteringFile(TTempFileHandle* meteringFile, const TString& streamPath, const TString& schema, + std::function<void(const NJson::TJsonValue::TMapType& map)> tags_check, + std::function<void(const NJson::TJsonValue::TMapType& map)> labels_check, std::function<void(const NJson::TJsonValue::TMapType& map)> usage_check) { Sleep(TDuration::Seconds(1)); - meteringFile->Flush(); - meteringFile->Close(); + if (meteringFile->IsOpen()) { + meteringFile->Flush(); + meteringFile->Close(); + } auto input = TFileInput(TFile(meteringFile->Name(), RdOnly | OpenExisting)); ui64 totalLines = 0; TString line; + bool schemaFound = false; while(input.ReadLine(line)) { totalLines++; Cerr << "Got line from metering file data: '" << line << "'" << Endl; NJson::TJsonValue json; NJson::ReadJsonTree(line, &json, true); auto& map = json.GetMap(); + UNIT_ASSERT(map.contains("schema")); + if (map.find("schema")->second.GetString() == schema) { + schemaFound = true; + } else { + continue; + } UNIT_ASSERT(map.contains("cloud_id")); UNIT_ASSERT(map.contains("folder_id")); UNIT_ASSERT(map.contains("resource_id")); @@ -123,13 +134,11 @@ void CheckMeteringFile(TTempFileHandle* meteringFile, const TString& streamPath, UNIT_ASSERT(map.find("cloud_id")->second.GetString() == "somecloud"); UNIT_ASSERT(map.find("folder_id")->second.GetString() == "somefolder"); UNIT_ASSERT(map.find("resource_id")->second.GetString() == streamPath); - auto& tags = map.find("tags")->second.GetMap(); - if (!tags.empty()) { - UNIT_ASSERT_VALUES_EQUAL(tags.size(), 3); - } + tags_check(map); + labels_check(map); usage_check(map); } - UNIT_ASSERT(totalLines >= 2); + return schemaFound; } @@ -240,7 +249,6 @@ Y_UNIT_TEST_SUITE(DataStreams) { UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); } - // now when stream is created delete should work fine { auto result = testServer.DataStreamsClient->DeleteStream(streamName).ExtractValueSync(); @@ -254,25 +262,76 @@ Y_UNIT_TEST_SUITE(DataStreams) { UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SCHEME_ERROR); } - CheckMeteringFile(testServer.MeteringFile.Get(), "/Root/" + streamName, + auto putUnitsSchemaFound = + CheckMeteringFile(testServer.MeteringFile.Get(), "/Root/" + streamName, "yds.events.puts.v1", + [](const NJson::TJsonValue::TMapType& map) { + UNIT_ASSERT(map.contains("tags")); + auto& tags = map.find("tags")->second.GetMap(); + UNIT_ASSERT_VALUES_EQUAL(tags.size(), 0); + }, + [streamName](const NJson::TJsonValue::TMapType& map) { + UNIT_ASSERT(map.contains("labels")); + auto& labels = map.find("labels")->second.GetMap(); + UNIT_ASSERT_VALUES_EQUAL( + labels.find("datastreams_stream_name")->second.GetString(), streamName); + UNIT_ASSERT_VALUES_EQUAL( + labels.find("ydb_database")->second.GetString(), "root"); + }, + [](const NJson::TJsonValue::TMapType& map) { + UNIT_ASSERT(map.contains("usage")); + auto& usage = map.find("usage")->second.GetMap(); + UNIT_ASSERT(usage.find("quantity")->second.GetInteger() >= 0); + UNIT_ASSERT_VALUES_EQUAL(usage.find("unit")->second.GetString(), "put_events"); + }); + UNIT_ASSERT_VALUES_EQUAL(putUnitsSchemaFound, true); + + auto resourcesReservedSchemaFound = + CheckMeteringFile(testServer.MeteringFile.Get(), "/Root/" + streamName, "yds.resources.reserved.v1", + [](const NJson::TJsonValue::TMapType& map) { + UNIT_ASSERT(map.contains("tags")); + auto& tags = map.find("tags")->second.GetMap(); + UNIT_ASSERT(tags.contains("reserved_throughput_bps")); + UNIT_ASSERT(tags.contains("shard_enhanced_consumers_throughput")); + UNIT_ASSERT(tags.contains("reserved_storage_bytes")); + }, + [streamName](const NJson::TJsonValue::TMapType& map) { + UNIT_ASSERT(map.contains("labels")); + auto& labels = map.find("labels")->second.GetMap(); + UNIT_ASSERT_VALUES_EQUAL( + labels.find("datastreams_stream_name")->second.GetString(), streamName); + UNIT_ASSERT_VALUES_EQUAL( + labels.find("ydb_database")->second.GetString(), "root"); + }, [](const NJson::TJsonValue::TMapType& map) { UNIT_ASSERT(map.contains("usage")); auto& usage = map.find("usage")->second.GetMap(); UNIT_ASSERT(usage.find("quantity")->second.GetInteger() >= 0); + UNIT_ASSERT_VALUES_EQUAL(usage.find("unit")->second.GetString(), "second"); }); + UNIT_ASSERT_VALUES_EQUAL(resourcesReservedSchemaFound, true); } Y_UNIT_TEST(TestNonChargeableUser) { - TInsecureDatastreamsTestServer testServer; + TSecureDatastreamsTestServer testServer; const TString streamName = TStringBuilder() << "stream_" << Y_UNIT_TEST_NAME; SET_YDS_LOCALS; - NYDS_V1::TDataStreamsClient client(*driver, TCommonClientSettings().AuthToken(NON_CHARGEABLE_USER)); - + const TString streamPath = "/Root/" + streamName; { - auto result = client.CreateStream(streamName, - NYDS_V1::TCreateStreamSettings().ShardCount(3)).ExtractValueSync(); + auto result = testServer.DataStreamsClient->CreateStream(streamPath, + NYDS_V1::TCreateStreamSettings().ShardCount(1)).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + NYDS_V1::TDataStreamsClient client(*driver, TCommonClientSettings().AuthToken(NON_CHARGEABLE_USER)); + NYdb::NScheme::TSchemeClient schemeClient(*driver); + { + NYdb::NScheme::TPermissions permissions(NON_CHARGEABLE_USER, + {"ydb.generic.read", "ydb.generic.write"}); + auto result = schemeClient.ModifyPermissions(streamPath, + NYdb::NScheme::TModifyPermissionsSettings().AddGrantPermissions(permissions) + ).ExtractValueSync(); + UNIT_ASSERT(result.IsSuccess()); } { // for metering purposes @@ -281,16 +340,49 @@ Y_UNIT_TEST_SUITE(DataStreams) { TString data = Sprintf("%04u", i); records.push_back({data, data, ""}); } - auto result = client.PutRecords(streamName, records).ExtractValueSync(); + auto result = client.PutRecords(streamPath, records).ExtractValueSync(); Cerr << result.GetResult().DebugString() << Endl; UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); } - Sleep(TDuration::Seconds(1)); - testServer.MeteringFile.Get()->Flush(); - UNIT_ASSERT_VALUES_EQUAL(testServer.MeteringFile.Get()->GetLength(), 0); - testServer.MeteringFile.Get()->Close(); + { + auto result = testServer.DataStreamsClient->DeleteStream(streamPath).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + } + + auto putUnitsSchemaFound = + CheckMeteringFile(testServer.MeteringFile.Get(), streamPath, "yds.events.puts.v1", + [](const NJson::TJsonValue::TMapType&) {}, + [](const NJson::TJsonValue::TMapType&) {}, + [](const NJson::TJsonValue::TMapType&) {}); + UNIT_ASSERT_VALUES_EQUAL(putUnitsSchemaFound, false); + + auto resourcesReservedSchemaFound = + CheckMeteringFile(testServer.MeteringFile.Get(), streamPath, "yds.resources.reserved.v1", + [](const NJson::TJsonValue::TMapType& map) { + UNIT_ASSERT(map.contains("tags")); + auto& tags = map.find("tags")->second.GetMap(); + UNIT_ASSERT(tags.contains("reserved_throughput_bps")); + UNIT_ASSERT(tags.contains("shard_enhanced_consumers_throughput")); + UNIT_ASSERT(tags.contains("reserved_storage_bytes")); + }, + [streamName](const NJson::TJsonValue::TMapType& map) { + UNIT_ASSERT(map.contains("labels")); + auto& labels = map.find("labels")->second.GetMap(); + UNIT_ASSERT_VALUES_EQUAL( + labels.find("datastreams_stream_name")->second.GetString(), streamName); + UNIT_ASSERT_VALUES_EQUAL( + labels.find("ydb_database")->second.GetString(), "root"); + }, + [](const NJson::TJsonValue::TMapType& map) { + UNIT_ASSERT(map.contains("usage")); + auto& usage = map.find("usage")->second.GetMap(); + UNIT_ASSERT(usage.find("quantity")->second.GetInteger() >= 0); + UNIT_ASSERT_VALUES_EQUAL(usage.find("unit")->second.GetString(), "second"); + }); + UNIT_ASSERT_VALUES_EQUAL(resourcesReservedSchemaFound, true); } Y_UNIT_TEST(TestCreateExistingStream) { @@ -579,9 +671,8 @@ Y_UNIT_TEST_SUITE(DataStreams) { const TString dataStr = "9876543210"; - auto putRecordResult = client.PutRecord("/Root/" + streamName, {dataStr, dataStr, dataStr}).ExtractValueSync(); - Cerr << putRecordResult.GetResult().DebugString() << Endl; - + auto putRecordResult = + client.PutRecord("/Root/" + streamName, {dataStr, dataStr, dataStr}).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL(putRecordResult.IsTransportError(), false); UNIT_ASSERT_VALUES_EQUAL(putRecordResult.GetStatus(), EStatus::SUCCESS); } diff --git a/ydb/services/datastreams/put_records_actor.h b/ydb/services/datastreams/put_records_actor.h index f217abbb6e..cc8e201b95 100644 --- a/ydb/services/datastreams/put_records_actor.h +++ b/ydb/services/datastreams/put_records_actor.h @@ -284,7 +284,7 @@ namespace NKikimr::NDataStreams::V1 { ShouldBeCharged = std::find( AppData(ctx)->PQConfig.GetNonChargeableUser().begin(), AppData(ctx)->PQConfig.GetNonChargeableUser().end(), - token.GetUserSID()) != AppData(ctx)->PQConfig.GetNonChargeableUser().end(); + token.GetUserSID()) == AppData(ctx)->PQConfig.GetNonChargeableUser().end(); SendNavigateRequest(ctx); this->Become(&TPutRecordsActorBase<TDerived, TProto>::StateFunc); |