aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormokhotskii <mokhotskii@yandex-team.ru>2022-06-06 14:52:32 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-06-06 14:52:32 +0300
commit9fb7cf183a555e64fbdf203a26dcd4a973ad383c (patch)
tree49b684d67a51e490dd683089d93ea80e59ddd6d1
parentf69185e2f55ceae1a27ca243e3fe240489c3e3cb (diff)
downloadydb-9fb7cf183a555e64fbdf203a26dcd4a973ad383c.tar.gz
[merge to 22-2] Fix defining if user should be charged in YDS for put units
CLOUD_95800 Fix defining if user should be charged in YDS for put units Fix defining if user should be charged in YDS for put units REVIEW: 2522665 REVIEW: 2523795 x-ydb-stable-ref: 7e24a3cdaf14fe296fc683936bc0c0b53189da1d
-rw-r--r--ydb/services/datastreams/datastreams_ut.cpp145
-rw-r--r--ydb/services/datastreams/put_records_actor.h2
2 files changed, 119 insertions, 28 deletions
diff --git a/ydb/services/datastreams/datastreams_ut.cpp b/ydb/services/datastreams/datastreams_ut.cpp
index 64f5705ccd..8a788225ff 100644
--- a/ydb/services/datastreams/datastreams_ut.cpp
+++ b/ydb/services/datastreams/datastreams_ut.cpp
@@ -28,9 +28,9 @@ struct WithSslAndAuth : TKikimrTestSettings {
};
using TKikimrWithGrpcAndRootSchemaSecure = NYdb::TBasicKikimrWithGrpcAndRootSchema<WithSslAndAuth>;
-static constexpr const char NON_CHARGEABLE_USER[] = "superuser@bultin";
-static constexpr const char NON_CHARGEABLE_USER_X[] = "superuser_x@bultin";
-static constexpr const char NON_CHARGEABLE_USER_Y[] = "superuser_y@bultin";
+static constexpr const char NON_CHARGEABLE_USER[] = "superuser@builtin";
+static constexpr const char NON_CHARGEABLE_USER_X[] = "superuser_x@builtin";
+static constexpr const char NON_CHARGEABLE_USER_Y[] = "superuser_y@builtin";
template<class TKikimr, bool secure>
class TDatastreamsTestServer {
@@ -98,20 +98,31 @@ public:
using TInsecureDatastreamsTestServer = TDatastreamsTestServer<TKikimrWithGrpcAndRootSchema, false>;
using TSecureDatastreamsTestServer = TDatastreamsTestServer<TKikimrWithGrpcAndRootSchemaSecure, true>;
-void CheckMeteringFile(TTempFileHandle* meteringFile, const TString& streamPath,
+bool CheckMeteringFile(TTempFileHandle* meteringFile, const TString& streamPath, const TString& schema,
+ std::function<void(const NJson::TJsonValue::TMapType& map)> tags_check,
+ std::function<void(const NJson::TJsonValue::TMapType& map)> labels_check,
std::function<void(const NJson::TJsonValue::TMapType& map)> usage_check) {
Sleep(TDuration::Seconds(1));
- meteringFile->Flush();
- meteringFile->Close();
+ if (meteringFile->IsOpen()) {
+ meteringFile->Flush();
+ meteringFile->Close();
+ }
auto input = TFileInput(TFile(meteringFile->Name(), RdOnly | OpenExisting));
ui64 totalLines = 0;
TString line;
+ bool schemaFound = false;
while(input.ReadLine(line)) {
totalLines++;
Cerr << "Got line from metering file data: '" << line << "'" << Endl;
NJson::TJsonValue json;
NJson::ReadJsonTree(line, &json, true);
auto& map = json.GetMap();
+ UNIT_ASSERT(map.contains("schema"));
+ if (map.find("schema")->second.GetString() == schema) {
+ schemaFound = true;
+ } else {
+ continue;
+ }
UNIT_ASSERT(map.contains("cloud_id"));
UNIT_ASSERT(map.contains("folder_id"));
UNIT_ASSERT(map.contains("resource_id"));
@@ -124,13 +135,11 @@ void CheckMeteringFile(TTempFileHandle* meteringFile, const TString& streamPath,
UNIT_ASSERT(map.find("cloud_id")->second.GetString() == "somecloud");
UNIT_ASSERT(map.find("folder_id")->second.GetString() == "somefolder");
UNIT_ASSERT(map.find("resource_id")->second.GetString() == streamPath);
- auto& tags = map.find("tags")->second.GetMap();
- if (!tags.empty()) {
- UNIT_ASSERT_VALUES_EQUAL(tags.size(), 3);
- }
+ tags_check(map);
+ labels_check(map);
usage_check(map);
}
- UNIT_ASSERT(totalLines >= 2);
+ return schemaFound;
}
@@ -241,7 +250,6 @@ Y_UNIT_TEST_SUITE(DataStreams) {
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
}
-
// now when stream is created delete should work fine
{
auto result = testServer.DataStreamsClient->DeleteStream(streamName).ExtractValueSync();
@@ -255,25 +263,76 @@ Y_UNIT_TEST_SUITE(DataStreams) {
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SCHEME_ERROR);
}
- CheckMeteringFile(testServer.MeteringFile.Get(), "/Root/" + streamName,
+ auto putUnitsSchemaFound =
+ CheckMeteringFile(testServer.MeteringFile.Get(), "/Root/" + streamName, "yds.events.puts.v1",
+ [](const NJson::TJsonValue::TMapType& map) {
+ UNIT_ASSERT(map.contains("tags"));
+ auto& tags = map.find("tags")->second.GetMap();
+ UNIT_ASSERT_VALUES_EQUAL(tags.size(), 0);
+ },
+ [streamName](const NJson::TJsonValue::TMapType& map) {
+ UNIT_ASSERT(map.contains("labels"));
+ auto& labels = map.find("labels")->second.GetMap();
+ UNIT_ASSERT_VALUES_EQUAL(
+ labels.find("datastreams_stream_name")->second.GetString(), streamName);
+ UNIT_ASSERT_VALUES_EQUAL(
+ labels.find("ydb_database")->second.GetString(), "root");
+ },
+ [](const NJson::TJsonValue::TMapType& map) {
+ UNIT_ASSERT(map.contains("usage"));
+ auto& usage = map.find("usage")->second.GetMap();
+ UNIT_ASSERT(usage.find("quantity")->second.GetInteger() >= 0);
+ UNIT_ASSERT_VALUES_EQUAL(usage.find("unit")->second.GetString(), "put_events");
+ });
+ UNIT_ASSERT_VALUES_EQUAL(putUnitsSchemaFound, true);
+
+ auto resourcesReservedSchemaFound =
+ CheckMeteringFile(testServer.MeteringFile.Get(), "/Root/" + streamName, "yds.resources.reserved.v1",
+ [](const NJson::TJsonValue::TMapType& map) {
+ UNIT_ASSERT(map.contains("tags"));
+ auto& tags = map.find("tags")->second.GetMap();
+ UNIT_ASSERT(tags.contains("reserved_throughput_bps"));
+ UNIT_ASSERT(tags.contains("shard_enhanced_consumers_throughput"));
+ UNIT_ASSERT(tags.contains("reserved_storage_bytes"));
+ },
+ [streamName](const NJson::TJsonValue::TMapType& map) {
+ UNIT_ASSERT(map.contains("labels"));
+ auto& labels = map.find("labels")->second.GetMap();
+ UNIT_ASSERT_VALUES_EQUAL(
+ labels.find("datastreams_stream_name")->second.GetString(), streamName);
+ UNIT_ASSERT_VALUES_EQUAL(
+ labels.find("ydb_database")->second.GetString(), "root");
+ },
[](const NJson::TJsonValue::TMapType& map) {
UNIT_ASSERT(map.contains("usage"));
auto& usage = map.find("usage")->second.GetMap();
UNIT_ASSERT(usage.find("quantity")->second.GetInteger() >= 0);
+ UNIT_ASSERT_VALUES_EQUAL(usage.find("unit")->second.GetString(), "second");
});
+ UNIT_ASSERT_VALUES_EQUAL(resourcesReservedSchemaFound, true);
}
Y_UNIT_TEST(TestNonChargeableUser) {
- TInsecureDatastreamsTestServer testServer;
+ TSecureDatastreamsTestServer testServer;
const TString streamName = TStringBuilder() << "stream_" << Y_UNIT_TEST_NAME;
SET_YDS_LOCALS;
- NYDS_V1::TDataStreamsClient client(*driver, TCommonClientSettings().AuthToken(NON_CHARGEABLE_USER));
-
+ const TString streamPath = "/Root/" + streamName;
{
- auto result = client.CreateStream(streamName,
- NYDS_V1::TCreateStreamSettings().ShardCount(3)).ExtractValueSync();
+ auto result = testServer.DataStreamsClient->CreateStream(streamPath,
+ NYDS_V1::TCreateStreamSettings().ShardCount(1)).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
- UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ NYDS_V1::TDataStreamsClient client(*driver, TCommonClientSettings().AuthToken(NON_CHARGEABLE_USER));
+ NYdb::NScheme::TSchemeClient schemeClient(*driver);
+ {
+ NYdb::NScheme::TPermissions permissions(NON_CHARGEABLE_USER,
+ {"ydb.generic.read", "ydb.generic.write"});
+ auto result = schemeClient.ModifyPermissions(streamPath,
+ NYdb::NScheme::TModifyPermissionsSettings().AddGrantPermissions(permissions)
+ ).ExtractValueSync();
+ UNIT_ASSERT(result.IsSuccess());
}
{ // for metering purposes
@@ -282,16 +341,49 @@ Y_UNIT_TEST_SUITE(DataStreams) {
TString data = Sprintf("%04u", i);
records.push_back({data, data, ""});
}
- auto result = client.PutRecords(streamName, records).ExtractValueSync();
+ auto result = client.PutRecords(streamPath, records).ExtractValueSync();
Cerr << result.GetResult().DebugString() << Endl;
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
}
- Sleep(TDuration::Seconds(1));
- testServer.MeteringFile.Get()->Flush();
- UNIT_ASSERT_VALUES_EQUAL(testServer.MeteringFile.Get()->GetLength(), 0);
- testServer.MeteringFile.Get()->Close();
+ {
+ auto result = testServer.DataStreamsClient->DeleteStream(streamPath).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+ }
+
+ auto putUnitsSchemaFound =
+ CheckMeteringFile(testServer.MeteringFile.Get(), streamPath, "yds.events.puts.v1",
+ [](const NJson::TJsonValue::TMapType&) {},
+ [](const NJson::TJsonValue::TMapType&) {},
+ [](const NJson::TJsonValue::TMapType&) {});
+ UNIT_ASSERT_VALUES_EQUAL(putUnitsSchemaFound, false);
+
+ auto resourcesReservedSchemaFound =
+ CheckMeteringFile(testServer.MeteringFile.Get(), streamPath, "yds.resources.reserved.v1",
+ [](const NJson::TJsonValue::TMapType& map) {
+ UNIT_ASSERT(map.contains("tags"));
+ auto& tags = map.find("tags")->second.GetMap();
+ UNIT_ASSERT(tags.contains("reserved_throughput_bps"));
+ UNIT_ASSERT(tags.contains("shard_enhanced_consumers_throughput"));
+ UNIT_ASSERT(tags.contains("reserved_storage_bytes"));
+ },
+ [streamName](const NJson::TJsonValue::TMapType& map) {
+ UNIT_ASSERT(map.contains("labels"));
+ auto& labels = map.find("labels")->second.GetMap();
+ UNIT_ASSERT_VALUES_EQUAL(
+ labels.find("datastreams_stream_name")->second.GetString(), streamName);
+ UNIT_ASSERT_VALUES_EQUAL(
+ labels.find("ydb_database")->second.GetString(), "root");
+ },
+ [](const NJson::TJsonValue::TMapType& map) {
+ UNIT_ASSERT(map.contains("usage"));
+ auto& usage = map.find("usage")->second.GetMap();
+ UNIT_ASSERT(usage.find("quantity")->second.GetInteger() >= 0);
+ UNIT_ASSERT_VALUES_EQUAL(usage.find("unit")->second.GetString(), "second");
+ });
+ UNIT_ASSERT_VALUES_EQUAL(resourcesReservedSchemaFound, true);
}
Y_UNIT_TEST(TestCreateExistingStream) {
@@ -580,9 +672,8 @@ Y_UNIT_TEST_SUITE(DataStreams) {
const TString dataStr = "9876543210";
- auto putRecordResult = client.PutRecord("/Root/" + streamName, {dataStr, dataStr, dataStr}).ExtractValueSync();
- Cerr << putRecordResult.GetResult().DebugString() << Endl;
-
+ auto putRecordResult =
+ client.PutRecord("/Root/" + streamName, {dataStr, dataStr, dataStr}).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(putRecordResult.IsTransportError(), false);
UNIT_ASSERT_VALUES_EQUAL(putRecordResult.GetStatus(), EStatus::SUCCESS);
}
diff --git a/ydb/services/datastreams/put_records_actor.h b/ydb/services/datastreams/put_records_actor.h
index 8923dce1c6..c43c6f3024 100644
--- a/ydb/services/datastreams/put_records_actor.h
+++ b/ydb/services/datastreams/put_records_actor.h
@@ -284,7 +284,7 @@ namespace NKikimr::NDataStreams::V1 {
ShouldBeCharged = std::find(
AppData(ctx)->PQConfig.GetNonChargeableUser().begin(),
AppData(ctx)->PQConfig.GetNonChargeableUser().end(),
- token.GetUserSID()) != AppData(ctx)->PQConfig.GetNonChargeableUser().end();
+ token.GetUserSID()) == AppData(ctx)->PQConfig.GetNonChargeableUser().end();
SendNavigateRequest(ctx);
this->Become(&TPutRecordsActorBase<TDerived, TProto>::StateFunc);