summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <[email protected]>2023-01-25 10:14:43 +0300
committerilnaz <[email protected]>2023-01-25 10:14:43 +0300
commit09d191f3a27078f10212a016b4219e3734a12975 (patch)
tree7d720141dd94b2787be820364c8be86cc67c7069
parent0974b4afdb8f7e2fa45d54f35298176cff07f0a3 (diff)
Metering
-rw-r--r--ydb/core/protos/tx_datashard.proto6
-rw-r--r--ydb/core/tx/datashard/cdc_stream_scan.cpp45
-rw-r--r--ydb/core/tx/datashard/cdc_stream_scan.h12
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h11
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp66
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.h8
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream.cpp153
-rw-r--r--ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema26
8 files changed, 302 insertions, 25 deletions
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index bd44157c3d7..6d0de59aab9 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -1450,11 +1450,17 @@ message TEvCdcStreamScanResponse {
ABORTED = 7;
}
+ message TStats {
+ optional uint64 RowsProcessed = 1;
+ optional uint64 BytesProcessed = 2;
+ }
+
optional uint64 TabletId = 1;
optional NKikimrProto.TPathID TablePathId = 2;
optional NKikimrProto.TPathID StreamPathId = 3;
optional EStatus Status = 4;
optional string ErrorDescription = 5;
+ optional TStats Stats = 6;
}
message TEvKqpScan {
diff --git a/ydb/core/tx/datashard/cdc_stream_scan.cpp b/ydb/core/tx/datashard/cdc_stream_scan.cpp
index cd8a52714fe..28f708445d2 100644
--- a/ydb/core/tx/datashard/cdc_stream_scan.cpp
+++ b/ydb/core/tx/datashard/cdc_stream_scan.cpp
@@ -36,6 +36,7 @@ bool TCdcStreamScanManager::Load(NIceDb::TNiceDb& db) {
Y_VERIFY(!Scans.contains(streamPathId));
auto& info = Scans[streamPathId];
+
info.SnapshotVersion = TRowVersion(
rowset.GetValue<Schema::CdcStreamScans::SnapshotStep>(),
rowset.GetValue<Schema::CdcStreamScans::SnapshotTxId>()
@@ -46,6 +47,9 @@ bool TCdcStreamScanManager::Load(NIceDb::TNiceDb& db) {
Y_VERIFY(TSerializedCellVec::TryParse(rowset.GetValue<Schema::CdcStreamScans::LastKey>(), *info.LastKey));
}
+ info.Stats.RowsProcessed = rowset.GetValueOrDefault<Schema::CdcStreamScans::RowsProcessed>(0);
+ info.Stats.BytesProcessed = rowset.GetValueOrDefault<Schema::CdcStreamScans::BytesProcessed>(0);
+
if (!rowset.Next()) {
return false;
}
@@ -139,13 +143,17 @@ void TCdcStreamScanManager::PersistRemove(NIceDb::TNiceDb& db,
.Delete();
}
-void TCdcStreamScanManager::PersistLastKey(NIceDb::TNiceDb& db,
- const TPathId& tablePathId, const TPathId& streamPathId, const TSerializedCellVec& value)
+void TCdcStreamScanManager::PersistProgress(NIceDb::TNiceDb& db,
+ const TPathId& tablePathId, const TPathId& streamPathId, const TScanInfo& info)
{
using Schema = TDataShard::Schema;
db.Table<Schema::CdcStreamScans>()
.Key(tablePathId.OwnerId, tablePathId.LocalPathId, streamPathId.OwnerId, streamPathId.LocalPathId)
- .Update<Schema::CdcStreamScans::LastKey>(value.GetBuffer());
+ .Update(
+ NIceDb::TUpdate<Schema::CdcStreamScans::LastKey>(info.LastKey->GetBuffer()),
+ NIceDb::TUpdate<Schema::CdcStreamScans::RowsProcessed>(info.Stats.RowsProcessed),
+ NIceDb::TUpdate<Schema::CdcStreamScans::BytesProcessed>(info.Stats.BytesProcessed)
+ );
}
class TDataShard::TTxCdcStreamScanProgress
@@ -292,7 +300,8 @@ public:
Y_VERIFY(info);
info->LastKey = key;
- Self->CdcStreamScanManager.PersistLastKey(db, tablePathId, streamPathId, key);
+ info->Stats = ev.Stats;
+ Self->CdcStreamScanManager.PersistProgress(db, tablePathId, streamPathId, *info);
}
Response = MakeHolder<TDataShard::TEvPrivate::TEvCdcStreamScanContinue>();
@@ -318,6 +327,8 @@ public:
}; // TTxCdcStreamScanProgress
class TCdcStreamScan: public IActorCallback, public IScan {
+ using TStats = TCdcStreamScanManager::TStats;
+
struct TDataShardId {
TActorId ActorId;
ui64 TabletId;
@@ -380,6 +391,15 @@ class TCdcStreamScan: public IActorCallback, public IScan {
Reply(NKikimrTxDataShard::TEvCdcStreamScanResponse::IN_PROGRESS);
}
+ void Progress() {
+ Stats.RowsProcessed += Buffer.Rows();
+ Stats.BytesProcessed += Buffer.Bytes();
+
+ Send(DataShard.ActorId, new TDataShard::TEvPrivate::TEvCdcStreamScanProgress(
+ TablePathId, StreamPathId, ReadVersion, ValueTags, std::move(Buffer.Flush()), Stats
+ ));
+ }
+
void Handle(TDataShard::TEvPrivate::TEvCdcStreamScanContinue::TPtr&) {
Driver->Touch(NoMoreData ? EScan::Final : EScan::Feed);
}
@@ -392,6 +412,8 @@ class TCdcStreamScan: public IActorCallback, public IScan {
PathIdFromPathId(StreamPathId, response->Record.MutableStreamPathId());
response->Record.SetStatus(status);
response->Record.SetErrorDescription(error);
+ response->Record.MutableStats()->SetRowsProcessed(Stats.RowsProcessed);
+ response->Record.MutableStats()->SetBytesProcessed(Stats.BytesProcessed);
Send(ReplyTo, std::move(response));
}
@@ -399,7 +421,8 @@ class TCdcStreamScan: public IActorCallback, public IScan {
public:
explicit TCdcStreamScan(const TDataShard* const self, const TActorId& replyTo, ui64 txId,
const TPathId& tablePathId, const TPathId& streamPathId, const TRowVersion& readVersion,
- const TVector<TTag>& valueTags, const TMaybe<TSerializedCellVec>& lastKey, const TLimits& limits)
+ const TVector<TTag>& valueTags, const TMaybe<TSerializedCellVec>& lastKey,
+ const TStats& stats, const TLimits& limits)
: IActorCallback(static_cast<TReceiveFunc>(&TCdcStreamScan::StateWork), NKikimrServices::TActivity::CDC_STREAM_SCAN_ACTOR)
, DataShard{self->SelfId(), self->TabletID()}
, ReplyTo(replyTo)
@@ -412,6 +435,7 @@ public:
, Limits(limits)
, Driver(nullptr)
, NoMoreData(false)
+ , Stats(stats)
{
}
@@ -456,9 +480,7 @@ public:
}
}
- Send(DataShard.ActorId, new TDataShard::TEvPrivate::TEvCdcStreamScanProgress(
- TablePathId, StreamPathId, ReadVersion, ValueTags, std::move(Buffer.Flush())
- ));
+ Progress();
return EScan::Sleep;
}
@@ -469,9 +491,7 @@ public:
return EScan::Final;
}
- Send(DataShard.ActorId, new TDataShard::TEvPrivate::TEvCdcStreamScanProgress(
- TablePathId, StreamPathId, ReadVersion, ValueTags, std::move(Buffer.Flush())
- ));
+ Progress();
return EScan::Sleep;
}
@@ -500,6 +520,7 @@ private:
IDriver* Driver;
bool NoMoreData;
TBuffer Buffer;
+ TStats Stats;
}; // TCdcStreamScan
@@ -621,7 +642,7 @@ public:
const ui64 localTxId = ++Self->NextTieBreakerIndex;
auto scan = MakeHolder<TCdcStreamScan>(Self, Request->Sender, localTxId,
- tablePathId, streamPathId, snapshotVersion, valueTags, info->LastKey, record.GetLimits());
+ tablePathId, streamPathId, snapshotVersion, valueTags, info->LastKey, info->Stats, record.GetLimits());
const ui64 scanId = Self->QueueScan(table->LocalTid, scan.Release(), localTxId,
TScanOptions()
.SetResourceBroker(taskName, taskPrio)
diff --git a/ydb/core/tx/datashard/cdc_stream_scan.h b/ydb/core/tx/datashard/cdc_stream_scan.h
index c061b0ed714..c8be61ded21 100644
--- a/ydb/core/tx/datashard/cdc_stream_scan.h
+++ b/ydb/core/tx/datashard/cdc_stream_scan.h
@@ -12,12 +12,20 @@
namespace NKikimr::NDataShard {
class TCdcStreamScanManager {
+public:
+ struct TStats {
+ ui64 RowsProcessed = 0;
+ ui64 BytesProcessed = 0;
+ };
+
+private:
struct TScanInfo {
TRowVersion SnapshotVersion;
ui64 TxId = 0;
ui64 ScanId = 0;
NActors::TActorId ActorId;
TMaybe<TSerializedCellVec> LastKey;
+ TStats Stats;
};
public:
@@ -44,8 +52,8 @@ public:
const TPathId& tablePathId, const TPathId& streamPathId, const TScanInfo& info);
void PersistRemove(NIceDb::TNiceDb& db,
const TPathId& tablePathId, const TPathId& streamPathId);
- void PersistLastKey(NIceDb::TNiceDb& db,
- const TPathId& tablePathId, const TPathId& streamPathId, const TSerializedCellVec& value);
+ void PersistProgress(NIceDb::TNiceDb& db,
+ const TPathId& tablePathId, const TPathId& streamPathId, const TScanInfo& info);
private:
THashMap<TPathId, TScanInfo> Scans;
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 36cea7ca134..072c621701f 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -492,12 +492,14 @@ class TDataShard
const TPathId& streamPathId,
const TRowVersion& readVersion,
const TVector<ui32>& valueTags,
- TVector<std::pair<TSerializedCellVec, TSerializedCellVec>>&& rows)
+ TVector<std::pair<TSerializedCellVec, TSerializedCellVec>>&& rows,
+ const TCdcStreamScanManager::TStats& stats)
: TablePathId(tablePathId)
, StreamPathId(streamPathId)
, ReadVersion(readVersion)
, ValueTags(valueTags)
, Rows(std::move(rows))
+ , Stats(stats)
{
}
@@ -506,6 +508,7 @@ class TDataShard
const TRowVersion ReadVersion;
const TVector<ui32> ValueTags;
TVector<std::pair<TSerializedCellVec, TSerializedCellVec>> Rows;
+ const TCdcStreamScanManager::TStats Stats;
};
struct TEvCdcStreamScanContinue : public TEventLocal<TEvCdcStreamScanContinue, EvCdcStreamScanContinue> {};
@@ -957,6 +960,8 @@ class TDataShard
struct LastKey : Column<5, NScheme::NTypeIds::String> {};
struct SnapshotStep : Column<6, NScheme::NTypeIds::Uint64> {};
struct SnapshotTxId : Column<7, NScheme::NTypeIds::Uint64> {};
+ struct RowsProcessed : Column<8, NScheme::NTypeIds::Uint64> {};
+ struct BytesProcessed : Column<9, NScheme::NTypeIds::Uint64> {};
using TKey = TableKey<TableOwnerId, TablePathId, StreamOwnerId, StreamPathId>;
using TColumns = TableColumns<
@@ -966,7 +971,9 @@ class TDataShard
StreamPathId,
LastKey,
SnapshotStep,
- SnapshotTxId
+ SnapshotTxId,
+ RowsProcessed,
+ BytesProcessed
>;
};
diff --git a/ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp b/ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp
index 2f0bfcbc640..aec034e7ec7 100644
--- a/ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp
@@ -1,5 +1,7 @@
+#include "schemeshard_billing_helpers.h"
#include "schemeshard_impl.h"
+#include <ydb/core/metering/metering.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <util/generic/deque.h>
@@ -11,6 +13,7 @@
#endif
#define LOG_D(stream) LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[CdcStreamScan] " << stream)
+#define LOG_N(stream) LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[CdcStreamScan] " << stream)
#define LOG_W(stream) LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[CdcStreamScan] " << stream)
#define LOG_E(stream) LOG_ERROR_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[CdcStreamScan] " << stream)
@@ -71,6 +74,7 @@ struct TSchemeShard::TCdcStreamScan::TTxProgress: public TTransactionBase<TSchem
// side effects
TDeque<std::tuple<TPathId, TTabletId, THolder<IEventBase>>> ScanRequests;
TPathId StreamToProgress;
+ THolder<NMetering::TEvMetering::TEvWriteMeteringJson> Metering;
THolder<TEvSchemeShard::TEvModifySchemeTransaction> Finalize;
public:
@@ -117,6 +121,10 @@ public:
ctx.Send(ctx.SelfID, new TEvPrivate::TEvRunCdcStreamScan(StreamToProgress));
}
+ if (Metering) {
+ ctx.Send(NMetering::MakeMeteringServiceID(), Metering.Release());
+ }
+
if (Finalize) {
Self->CdcStreamScanFinalizer = ctx.Register(new TCdcStreamScanFinalizer(ctx.SelfID, std::move(Finalize)));
}
@@ -266,6 +274,7 @@ private:
streamInfo->InProgressShards.erase(shardIdx);
Self->CdcStreamScanPipes.Close(streamPathId, tabletId, ctx);
StreamToProgress = streamPathId;
+ Bill(streamPathId, shardIdx, TRUCalculator::ReadTable(record.GetStats().GetBytesProcessed()), ctx);
break;
case NKikimrTxDataShard::TEvCdcStreamScanResponse::OVERLOADED:
@@ -345,6 +354,63 @@ private:
return true;
}
+
+ void Bill(const TPathId& pathId, const TShardIdx& shardIdx, ui64 ru, const TActorContext& ctx) {
+ const auto domainPathId = Self->ResolvePathIdForDomain(pathId);
+
+ Y_VERIFY(Self->SubDomains.contains(domainPathId));
+ auto domainInfo = Self->SubDomains.at(domainPathId);
+
+ if (!Self->IsServerlessDomain(domainInfo)) {
+ LOG_D("Unable to make a bill"
+ << ": streamPathId# " << pathId
+ << ", reason# " << "domain is not a serverless db");
+ return;
+ }
+
+ Y_VERIFY(Self->PathsById.contains(domainPathId));
+ auto domainPath = Self->PathsById.at(domainPathId);
+
+ const auto& attrs = domainPath->UserAttrs->Attrs;
+ if (!attrs.contains("cloud_id")) {
+ LOG_D("Unable to make a bill"
+ << ": streamPathId# " << pathId
+ << ", reason# " << "'cloud_id' not found in user attributes");
+ return;
+ }
+
+ if (!attrs.contains("folder_id")) {
+ LOG_D("Unable to make a bill"
+ << ": streamPathId# " << pathId
+ << ", reason# " << "'folder_id' not found in user attributes");
+ return;
+ }
+
+ if (!attrs.contains("database_id")) {
+ LOG_D("Unable to make a bill"
+ << ": streamPathId# " << pathId
+ << ", reason# " << "'database_id' not found in user attributes");
+ return;
+ }
+
+ const auto now = ctx.Now();
+ const TString id = TStringBuilder() << "cdc_stream_scan"
+ << "-" << pathId.OwnerId << "-" << pathId.LocalPathId
+ << "-" << shardIdx.GetOwnerId() << "-" << shardIdx.GetLocalId();
+ const TString billRecord = TBillRecord()
+ .Id(id)
+ .CloudId(attrs.at("cloud_id"))
+ .FolderId(attrs.at("folder_id"))
+ .ResourceId(attrs.at("database_id"))
+ .SourceWt(now)
+ .Usage(TBillRecord::RequestUnits(Max(ui64(1), ru), now))
+ .ToString();
+
+ LOG_N("Make a bill"
+ << ": streamPathId# " << pathId
+ << ", record# " << billRecord);
+ Metering = MakeHolder<NMetering::TEvMetering::TEvWriteMeteringJson>(std::move(billRecord));
+ }
};
ITransaction* TSchemeShard::CreateTxProgressCdcStreamScan(TEvPrivate::TEvRunCdcStreamScan::TPtr& ev) {
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h
index d53c3c090dc..b6b276f1aef 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.h
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.h
@@ -341,11 +341,15 @@ public:
return pId == RootPathId();
}
- bool IsServerlessDomain(const TPath& domain) const {
- const auto& resourcesDomainId = domain.DomainInfo()->GetResourcesDomainId();
+ bool IsServerlessDomain(TSubDomainInfo::TPtr domainInfo) const {
+ const auto& resourcesDomainId = domainInfo->GetResourcesDomainId();
return !IsDomainSchemeShard && resourcesDomainId && resourcesDomainId != ParentDomainId;
}
+ bool IsServerlessDomain(const TPath& domain) const {
+ return IsServerlessDomain(domain.DomainInfo());
+ }
+
TPathId MakeLocalId(const TLocalPathId& localPathId) const {
return TPathId(TabletID(), localPathId);
}
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
index c9225677822..3622210f2d8 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
@@ -1,5 +1,6 @@
#include <ydb/core/metering/metering.h>
#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
+#include <ydb/core/tx/schemeshard/schemeshard_billing_helpers.h>
#include <ydb/core/tx/schemeshard/schemeshard_impl.h>
#include <library/cpp/json/json_reader.h>
@@ -1049,4 +1050,156 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithInitialScanTests) {
env.TestWaitNotification(runtime, txId);
}
+ void Metering(bool serverless) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime, TTestEnvOptions()
+ .EnableProtoSourceIdInfo(true)
+ .EnableChangefeedInitialScan(true));
+ ui64 txId = 100;
+
+ // create shared db
+ TestCreateExtSubDomain(runtime, ++txId, "/MyRoot", R"(
+ Name: "Shared"
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ TestAlterExtSubDomain(runtime, ++txId, "/MyRoot", R"(
+ Name: "Shared"
+ StoragePools {
+ Name: "pool-1"
+ Kind: "pool-kind-1"
+ }
+ StoragePools {
+ Name: "pool-2"
+ Kind: "pool-kind-2"
+ }
+ PlanResolution: 50
+ Coordinators: 1
+ Mediators: 1
+ TimeCastBucketsPerMediator: 2
+ ExternalSchemeShard: true
+ ExternalHive: false
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ const auto attrs = AlterUserAttrs({
+ {"cloud_id", "CLOUD_ID_VAL"},
+ {"folder_id", "FOLDER_ID_VAL"},
+ {"database_id", "DATABASE_ID_VAL"}
+ });
+
+ // create serverless db
+ TestCreateExtSubDomain(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ Name: "Serverless"
+ ResourcesDomainKey {
+ SchemeShard: %lu
+ PathId: 2
+ }
+ )", TTestTxConfig::SchemeShard), attrs);
+ env.TestWaitNotification(runtime, txId);
+
+ TestAlterExtSubDomain(runtime, ++txId, "/MyRoot", R"(
+ Name: "Serverless"
+ StoragePools {
+ Name: "pool-1"
+ Kind: "pool-kind-1"
+ }
+ PlanResolution: 50
+ Coordinators: 1
+ Mediators: 1
+ TimeCastBucketsPerMediator: 2
+ ExternalSchemeShard: true
+ ExternalHive: false
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ TString dbName;
+ if (serverless) {
+ dbName = "/MyRoot/Serverless";
+ } else {
+ dbName = "/MyRoot/Shared";
+ }
+
+ ui64 schemeShard = 0;
+ TestDescribeResult(DescribePath(runtime, dbName), {
+ NLs::PathExist,
+ NLs::ExtractTenantSchemeshard(&schemeShard)
+ });
+
+ UNIT_ASSERT(schemeShard != 0 && schemeShard != TTestTxConfig::SchemeShard);
+
+ TestCreateTable(runtime, schemeShard, ++txId, dbName, R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ )");
+ env.TestWaitNotification(runtime, txId, schemeShard);
+
+ bool catchMeteringRecord = false;
+ TString meteringRecord;
+ runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvDataShard::EvCdcStreamScanResponse:
+ if (const auto* msg = ev->Get<TEvDataShard::TEvCdcStreamScanResponse>()) {
+ if (msg->Record.GetStatus() == NKikimrTxDataShard::TEvCdcStreamScanResponse::DONE) {
+ catchMeteringRecord = true;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ case NMetering::TEvMetering::EvWriteMeteringJson:
+ if (catchMeteringRecord) {
+ meteringRecord = ev->Get<NMetering::TEvMetering::TEvWriteMeteringJson>()->MeteringJson;
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ default:
+ return TTestActorRuntime::EEventAction::PROCESS;
+ }
+ });
+
+ TestCreateCdcStream(runtime, schemeShard, ++txId, dbName, R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
+ State: ECdcStreamStateScan
+ }
+ )");
+ env.TestWaitNotification(runtime, txId, schemeShard);
+
+ if (serverless) {
+ if (meteringRecord.empty()) {
+ TDispatchOptions opts;
+ opts.FinalEvents.emplace_back([&meteringRecord](IEventHandle&) {
+ return !meteringRecord.empty();
+ });
+ runtime.DispatchEvents(opts);
+ }
+
+ UNIT_ASSERT_STRINGS_EQUAL(meteringRecord, TBillRecord()
+ .Id("cdc_stream_scan-9437197-3-9437197-4")
+ .CloudId("CLOUD_ID_VAL")
+ .FolderId("FOLDER_ID_VAL")
+ .ResourceId("DATABASE_ID_VAL")
+ .SourceWt(TInstant::FromValue(0))
+ .Usage(TBillRecord::RequestUnits(1, TInstant::FromValue(0)))
+ .ToString());
+ } else {
+ for (int i = 0; i < 10; ++i) {
+ env.SimulateSleep(runtime, TDuration::Seconds(1));
+ }
+
+ UNIT_ASSERT(meteringRecord.empty());
+ }
+ }
+
+ Y_UNIT_TEST(MeteringServerless) {
+ Metering(true);
+ }
+
+ Y_UNIT_TEST(MeteringDedicated) {
+ Metering(false);
+ }
+
} // TCdcStreamWithInitialScanTests
diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema
index 108cac8fee5..0c51c0565c5 100644
--- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema
+++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema
@@ -1824,11 +1824,6 @@
],
"ColumnsAdded": [
{
- "ColumnId": 7,
- "ColumnName": "SnapshotTxId",
- "ColumnType": "Uint64"
- },
- {
"ColumnId": 1,
"ColumnName": "TableOwnerId",
"ColumnType": "Uint64"
@@ -1857,19 +1852,36 @@
"ColumnId": 6,
"ColumnName": "SnapshotStep",
"ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 7,
+ "ColumnName": "SnapshotTxId",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 8,
+ "ColumnName": "RowsProcessed",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 9,
+ "ColumnName": "BytesProcessed",
+ "ColumnType": "Uint64"
}
],
"ColumnsDropped": [],
"ColumnFamilies": {
"0": {
"Columns": [
- 7,
1,
2,
3,
4,
5,
- 6
+ 6,
+ 7,
+ 8,
+ 9
],
"RoomID": 0,
"Codec": 0,