diff options
author | ilnaz <[email protected]> | 2023-01-25 10:14:43 +0300 |
---|---|---|
committer | ilnaz <[email protected]> | 2023-01-25 10:14:43 +0300 |
commit | 09d191f3a27078f10212a016b4219e3734a12975 (patch) | |
tree | 7d720141dd94b2787be820364c8be86cc67c7069 | |
parent | 0974b4afdb8f7e2fa45d54f35298176cff07f0a3 (diff) |
Metering
8 files changed, 302 insertions, 25 deletions
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index bd44157c3d7..6d0de59aab9 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -1450,11 +1450,17 @@ message TEvCdcStreamScanResponse { ABORTED = 7; } + message TStats { + optional uint64 RowsProcessed = 1; + optional uint64 BytesProcessed = 2; + } + optional uint64 TabletId = 1; optional NKikimrProto.TPathID TablePathId = 2; optional NKikimrProto.TPathID StreamPathId = 3; optional EStatus Status = 4; optional string ErrorDescription = 5; + optional TStats Stats = 6; } message TEvKqpScan { diff --git a/ydb/core/tx/datashard/cdc_stream_scan.cpp b/ydb/core/tx/datashard/cdc_stream_scan.cpp index cd8a52714fe..28f708445d2 100644 --- a/ydb/core/tx/datashard/cdc_stream_scan.cpp +++ b/ydb/core/tx/datashard/cdc_stream_scan.cpp @@ -36,6 +36,7 @@ bool TCdcStreamScanManager::Load(NIceDb::TNiceDb& db) { Y_VERIFY(!Scans.contains(streamPathId)); auto& info = Scans[streamPathId]; + info.SnapshotVersion = TRowVersion( rowset.GetValue<Schema::CdcStreamScans::SnapshotStep>(), rowset.GetValue<Schema::CdcStreamScans::SnapshotTxId>() @@ -46,6 +47,9 @@ bool TCdcStreamScanManager::Load(NIceDb::TNiceDb& db) { Y_VERIFY(TSerializedCellVec::TryParse(rowset.GetValue<Schema::CdcStreamScans::LastKey>(), *info.LastKey)); } + info.Stats.RowsProcessed = rowset.GetValueOrDefault<Schema::CdcStreamScans::RowsProcessed>(0); + info.Stats.BytesProcessed = rowset.GetValueOrDefault<Schema::CdcStreamScans::BytesProcessed>(0); + if (!rowset.Next()) { return false; } @@ -139,13 +143,17 @@ void TCdcStreamScanManager::PersistRemove(NIceDb::TNiceDb& db, .Delete(); } -void TCdcStreamScanManager::PersistLastKey(NIceDb::TNiceDb& db, - const TPathId& tablePathId, const TPathId& streamPathId, const TSerializedCellVec& value) +void TCdcStreamScanManager::PersistProgress(NIceDb::TNiceDb& db, + const TPathId& tablePathId, const TPathId& streamPathId, const TScanInfo& info) { using Schema = TDataShard::Schema; db.Table<Schema::CdcStreamScans>() .Key(tablePathId.OwnerId, tablePathId.LocalPathId, streamPathId.OwnerId, streamPathId.LocalPathId) - .Update<Schema::CdcStreamScans::LastKey>(value.GetBuffer()); + .Update( + NIceDb::TUpdate<Schema::CdcStreamScans::LastKey>(info.LastKey->GetBuffer()), + NIceDb::TUpdate<Schema::CdcStreamScans::RowsProcessed>(info.Stats.RowsProcessed), + NIceDb::TUpdate<Schema::CdcStreamScans::BytesProcessed>(info.Stats.BytesProcessed) + ); } class TDataShard::TTxCdcStreamScanProgress @@ -292,7 +300,8 @@ public: Y_VERIFY(info); info->LastKey = key; - Self->CdcStreamScanManager.PersistLastKey(db, tablePathId, streamPathId, key); + info->Stats = ev.Stats; + Self->CdcStreamScanManager.PersistProgress(db, tablePathId, streamPathId, *info); } Response = MakeHolder<TDataShard::TEvPrivate::TEvCdcStreamScanContinue>(); @@ -318,6 +327,8 @@ public: }; // TTxCdcStreamScanProgress class TCdcStreamScan: public IActorCallback, public IScan { + using TStats = TCdcStreamScanManager::TStats; + struct TDataShardId { TActorId ActorId; ui64 TabletId; @@ -380,6 +391,15 @@ class TCdcStreamScan: public IActorCallback, public IScan { Reply(NKikimrTxDataShard::TEvCdcStreamScanResponse::IN_PROGRESS); } + void Progress() { + Stats.RowsProcessed += Buffer.Rows(); + Stats.BytesProcessed += Buffer.Bytes(); + + Send(DataShard.ActorId, new TDataShard::TEvPrivate::TEvCdcStreamScanProgress( + TablePathId, StreamPathId, ReadVersion, ValueTags, std::move(Buffer.Flush()), Stats + )); + } + void Handle(TDataShard::TEvPrivate::TEvCdcStreamScanContinue::TPtr&) { Driver->Touch(NoMoreData ? EScan::Final : EScan::Feed); } @@ -392,6 +412,8 @@ class TCdcStreamScan: public IActorCallback, public IScan { PathIdFromPathId(StreamPathId, response->Record.MutableStreamPathId()); response->Record.SetStatus(status); response->Record.SetErrorDescription(error); + response->Record.MutableStats()->SetRowsProcessed(Stats.RowsProcessed); + response->Record.MutableStats()->SetBytesProcessed(Stats.BytesProcessed); Send(ReplyTo, std::move(response)); } @@ -399,7 +421,8 @@ class TCdcStreamScan: public IActorCallback, public IScan { public: explicit TCdcStreamScan(const TDataShard* const self, const TActorId& replyTo, ui64 txId, const TPathId& tablePathId, const TPathId& streamPathId, const TRowVersion& readVersion, - const TVector<TTag>& valueTags, const TMaybe<TSerializedCellVec>& lastKey, const TLimits& limits) + const TVector<TTag>& valueTags, const TMaybe<TSerializedCellVec>& lastKey, + const TStats& stats, const TLimits& limits) : IActorCallback(static_cast<TReceiveFunc>(&TCdcStreamScan::StateWork), NKikimrServices::TActivity::CDC_STREAM_SCAN_ACTOR) , DataShard{self->SelfId(), self->TabletID()} , ReplyTo(replyTo) @@ -412,6 +435,7 @@ public: , Limits(limits) , Driver(nullptr) , NoMoreData(false) + , Stats(stats) { } @@ -456,9 +480,7 @@ public: } } - Send(DataShard.ActorId, new TDataShard::TEvPrivate::TEvCdcStreamScanProgress( - TablePathId, StreamPathId, ReadVersion, ValueTags, std::move(Buffer.Flush()) - )); + Progress(); return EScan::Sleep; } @@ -469,9 +491,7 @@ public: return EScan::Final; } - Send(DataShard.ActorId, new TDataShard::TEvPrivate::TEvCdcStreamScanProgress( - TablePathId, StreamPathId, ReadVersion, ValueTags, std::move(Buffer.Flush()) - )); + Progress(); return EScan::Sleep; } @@ -500,6 +520,7 @@ private: IDriver* Driver; bool NoMoreData; TBuffer Buffer; + TStats Stats; }; // TCdcStreamScan @@ -621,7 +642,7 @@ public: const ui64 localTxId = ++Self->NextTieBreakerIndex; auto scan = MakeHolder<TCdcStreamScan>(Self, Request->Sender, localTxId, - tablePathId, streamPathId, snapshotVersion, valueTags, info->LastKey, record.GetLimits()); + tablePathId, streamPathId, snapshotVersion, valueTags, info->LastKey, info->Stats, record.GetLimits()); const ui64 scanId = Self->QueueScan(table->LocalTid, scan.Release(), localTxId, TScanOptions() .SetResourceBroker(taskName, taskPrio) diff --git a/ydb/core/tx/datashard/cdc_stream_scan.h b/ydb/core/tx/datashard/cdc_stream_scan.h index c061b0ed714..c8be61ded21 100644 --- a/ydb/core/tx/datashard/cdc_stream_scan.h +++ b/ydb/core/tx/datashard/cdc_stream_scan.h @@ -12,12 +12,20 @@ namespace NKikimr::NDataShard { class TCdcStreamScanManager { +public: + struct TStats { + ui64 RowsProcessed = 0; + ui64 BytesProcessed = 0; + }; + +private: struct TScanInfo { TRowVersion SnapshotVersion; ui64 TxId = 0; ui64 ScanId = 0; NActors::TActorId ActorId; TMaybe<TSerializedCellVec> LastKey; + TStats Stats; }; public: @@ -44,8 +52,8 @@ public: const TPathId& tablePathId, const TPathId& streamPathId, const TScanInfo& info); void PersistRemove(NIceDb::TNiceDb& db, const TPathId& tablePathId, const TPathId& streamPathId); - void PersistLastKey(NIceDb::TNiceDb& db, - const TPathId& tablePathId, const TPathId& streamPathId, const TSerializedCellVec& value); + void PersistProgress(NIceDb::TNiceDb& db, + const TPathId& tablePathId, const TPathId& streamPathId, const TScanInfo& info); private: THashMap<TPathId, TScanInfo> Scans; diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 36cea7ca134..072c621701f 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -492,12 +492,14 @@ class TDataShard const TPathId& streamPathId, const TRowVersion& readVersion, const TVector<ui32>& valueTags, - TVector<std::pair<TSerializedCellVec, TSerializedCellVec>>&& rows) + TVector<std::pair<TSerializedCellVec, TSerializedCellVec>>&& rows, + const TCdcStreamScanManager::TStats& stats) : TablePathId(tablePathId) , StreamPathId(streamPathId) , ReadVersion(readVersion) , ValueTags(valueTags) , Rows(std::move(rows)) + , Stats(stats) { } @@ -506,6 +508,7 @@ class TDataShard const TRowVersion ReadVersion; const TVector<ui32> ValueTags; TVector<std::pair<TSerializedCellVec, TSerializedCellVec>> Rows; + const TCdcStreamScanManager::TStats Stats; }; struct TEvCdcStreamScanContinue : public TEventLocal<TEvCdcStreamScanContinue, EvCdcStreamScanContinue> {}; @@ -957,6 +960,8 @@ class TDataShard struct LastKey : Column<5, NScheme::NTypeIds::String> {}; struct SnapshotStep : Column<6, NScheme::NTypeIds::Uint64> {}; struct SnapshotTxId : Column<7, NScheme::NTypeIds::Uint64> {}; + struct RowsProcessed : Column<8, NScheme::NTypeIds::Uint64> {}; + struct BytesProcessed : Column<9, NScheme::NTypeIds::Uint64> {}; using TKey = TableKey<TableOwnerId, TablePathId, StreamOwnerId, StreamPathId>; using TColumns = TableColumns< @@ -966,7 +971,9 @@ class TDataShard StreamPathId, LastKey, SnapshotStep, - SnapshotTxId + SnapshotTxId, + RowsProcessed, + BytesProcessed >; }; diff --git a/ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp b/ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp index 2f0bfcbc640..aec034e7ec7 100644 --- a/ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp @@ -1,5 +1,7 @@ +#include "schemeshard_billing_helpers.h" #include "schemeshard_impl.h" +#include <ydb/core/metering/metering.h> #include <ydb/core/tx/tx_proxy/proxy.h> #include <util/generic/deque.h> @@ -11,6 +13,7 @@ #endif #define LOG_D(stream) LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[CdcStreamScan] " << stream) +#define LOG_N(stream) LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[CdcStreamScan] " << stream) #define LOG_W(stream) LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[CdcStreamScan] " << stream) #define LOG_E(stream) LOG_ERROR_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[CdcStreamScan] " << stream) @@ -71,6 +74,7 @@ struct TSchemeShard::TCdcStreamScan::TTxProgress: public TTransactionBase<TSchem // side effects TDeque<std::tuple<TPathId, TTabletId, THolder<IEventBase>>> ScanRequests; TPathId StreamToProgress; + THolder<NMetering::TEvMetering::TEvWriteMeteringJson> Metering; THolder<TEvSchemeShard::TEvModifySchemeTransaction> Finalize; public: @@ -117,6 +121,10 @@ public: ctx.Send(ctx.SelfID, new TEvPrivate::TEvRunCdcStreamScan(StreamToProgress)); } + if (Metering) { + ctx.Send(NMetering::MakeMeteringServiceID(), Metering.Release()); + } + if (Finalize) { Self->CdcStreamScanFinalizer = ctx.Register(new TCdcStreamScanFinalizer(ctx.SelfID, std::move(Finalize))); } @@ -266,6 +274,7 @@ private: streamInfo->InProgressShards.erase(shardIdx); Self->CdcStreamScanPipes.Close(streamPathId, tabletId, ctx); StreamToProgress = streamPathId; + Bill(streamPathId, shardIdx, TRUCalculator::ReadTable(record.GetStats().GetBytesProcessed()), ctx); break; case NKikimrTxDataShard::TEvCdcStreamScanResponse::OVERLOADED: @@ -345,6 +354,63 @@ private: return true; } + + void Bill(const TPathId& pathId, const TShardIdx& shardIdx, ui64 ru, const TActorContext& ctx) { + const auto domainPathId = Self->ResolvePathIdForDomain(pathId); + + Y_VERIFY(Self->SubDomains.contains(domainPathId)); + auto domainInfo = Self->SubDomains.at(domainPathId); + + if (!Self->IsServerlessDomain(domainInfo)) { + LOG_D("Unable to make a bill" + << ": streamPathId# " << pathId + << ", reason# " << "domain is not a serverless db"); + return; + } + + Y_VERIFY(Self->PathsById.contains(domainPathId)); + auto domainPath = Self->PathsById.at(domainPathId); + + const auto& attrs = domainPath->UserAttrs->Attrs; + if (!attrs.contains("cloud_id")) { + LOG_D("Unable to make a bill" + << ": streamPathId# " << pathId + << ", reason# " << "'cloud_id' not found in user attributes"); + return; + } + + if (!attrs.contains("folder_id")) { + LOG_D("Unable to make a bill" + << ": streamPathId# " << pathId + << ", reason# " << "'folder_id' not found in user attributes"); + return; + } + + if (!attrs.contains("database_id")) { + LOG_D("Unable to make a bill" + << ": streamPathId# " << pathId + << ", reason# " << "'database_id' not found in user attributes"); + return; + } + + const auto now = ctx.Now(); + const TString id = TStringBuilder() << "cdc_stream_scan" + << "-" << pathId.OwnerId << "-" << pathId.LocalPathId + << "-" << shardIdx.GetOwnerId() << "-" << shardIdx.GetLocalId(); + const TString billRecord = TBillRecord() + .Id(id) + .CloudId(attrs.at("cloud_id")) + .FolderId(attrs.at("folder_id")) + .ResourceId(attrs.at("database_id")) + .SourceWt(now) + .Usage(TBillRecord::RequestUnits(Max(ui64(1), ru), now)) + .ToString(); + + LOG_N("Make a bill" + << ": streamPathId# " << pathId + << ", record# " << billRecord); + Metering = MakeHolder<NMetering::TEvMetering::TEvWriteMeteringJson>(std::move(billRecord)); + } }; ITransaction* TSchemeShard::CreateTxProgressCdcStreamScan(TEvPrivate::TEvRunCdcStreamScan::TPtr& ev) { diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index d53c3c090dc..b6b276f1aef 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -341,11 +341,15 @@ public: return pId == RootPathId(); } - bool IsServerlessDomain(const TPath& domain) const { - const auto& resourcesDomainId = domain.DomainInfo()->GetResourcesDomainId(); + bool IsServerlessDomain(TSubDomainInfo::TPtr domainInfo) const { + const auto& resourcesDomainId = domainInfo->GetResourcesDomainId(); return !IsDomainSchemeShard && resourcesDomainId && resourcesDomainId != ParentDomainId; } + bool IsServerlessDomain(const TPath& domain) const { + return IsServerlessDomain(domain.DomainInfo()); + } + TPathId MakeLocalId(const TLocalPathId& localPathId) const { return TPathId(TabletID(), localPathId); } diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp index c9225677822..3622210f2d8 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp @@ -1,5 +1,6 @@ #include <ydb/core/metering/metering.h> #include <ydb/core/tx/schemeshard/ut_helpers/helpers.h> +#include <ydb/core/tx/schemeshard/schemeshard_billing_helpers.h> #include <ydb/core/tx/schemeshard/schemeshard_impl.h> #include <library/cpp/json/json_reader.h> @@ -1049,4 +1050,156 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithInitialScanTests) { env.TestWaitNotification(runtime, txId); } + void Metering(bool serverless) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions() + .EnableProtoSourceIdInfo(true) + .EnableChangefeedInitialScan(true)); + ui64 txId = 100; + + // create shared db + TestCreateExtSubDomain(runtime, ++txId, "/MyRoot", R"( + Name: "Shared" + )"); + env.TestWaitNotification(runtime, txId); + + TestAlterExtSubDomain(runtime, ++txId, "/MyRoot", R"( + Name: "Shared" + StoragePools { + Name: "pool-1" + Kind: "pool-kind-1" + } + StoragePools { + Name: "pool-2" + Kind: "pool-kind-2" + } + PlanResolution: 50 + Coordinators: 1 + Mediators: 1 + TimeCastBucketsPerMediator: 2 + ExternalSchemeShard: true + ExternalHive: false + )"); + env.TestWaitNotification(runtime, txId); + + const auto attrs = AlterUserAttrs({ + {"cloud_id", "CLOUD_ID_VAL"}, + {"folder_id", "FOLDER_ID_VAL"}, + {"database_id", "DATABASE_ID_VAL"} + }); + + // create serverless db + TestCreateExtSubDomain(runtime, ++txId, "/MyRoot", Sprintf(R"( + Name: "Serverless" + ResourcesDomainKey { + SchemeShard: %lu + PathId: 2 + } + )", TTestTxConfig::SchemeShard), attrs); + env.TestWaitNotification(runtime, txId); + + TestAlterExtSubDomain(runtime, ++txId, "/MyRoot", R"( + Name: "Serverless" + StoragePools { + Name: "pool-1" + Kind: "pool-kind-1" + } + PlanResolution: 50 + Coordinators: 1 + Mediators: 1 + TimeCastBucketsPerMediator: 2 + ExternalSchemeShard: true + ExternalHive: false + )"); + env.TestWaitNotification(runtime, txId); + + TString dbName; + if (serverless) { + dbName = "/MyRoot/Serverless"; + } else { + dbName = "/MyRoot/Shared"; + } + + ui64 schemeShard = 0; + TestDescribeResult(DescribePath(runtime, dbName), { + NLs::PathExist, + NLs::ExtractTenantSchemeshard(&schemeShard) + }); + + UNIT_ASSERT(schemeShard != 0 && schemeShard != TTestTxConfig::SchemeShard); + + TestCreateTable(runtime, schemeShard, ++txId, dbName, R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId, schemeShard); + + bool catchMeteringRecord = false; + TString meteringRecord; + runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { + switch (ev->GetTypeRewrite()) { + case TEvDataShard::EvCdcStreamScanResponse: + if (const auto* msg = ev->Get<TEvDataShard::TEvCdcStreamScanResponse>()) { + if (msg->Record.GetStatus() == NKikimrTxDataShard::TEvCdcStreamScanResponse::DONE) { + catchMeteringRecord = true; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + case NMetering::TEvMetering::EvWriteMeteringJson: + if (catchMeteringRecord) { + meteringRecord = ev->Get<NMetering::TEvMetering::TEvWriteMeteringJson>()->MeteringJson; + } + return TTestActorRuntime::EEventAction::PROCESS; + default: + return TTestActorRuntime::EEventAction::PROCESS; + } + }); + + TestCreateCdcStream(runtime, schemeShard, ++txId, dbName, R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + State: ECdcStreamStateScan + } + )"); + env.TestWaitNotification(runtime, txId, schemeShard); + + if (serverless) { + if (meteringRecord.empty()) { + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&meteringRecord](IEventHandle&) { + return !meteringRecord.empty(); + }); + runtime.DispatchEvents(opts); + } + + UNIT_ASSERT_STRINGS_EQUAL(meteringRecord, TBillRecord() + .Id("cdc_stream_scan-9437197-3-9437197-4") + .CloudId("CLOUD_ID_VAL") + .FolderId("FOLDER_ID_VAL") + .ResourceId("DATABASE_ID_VAL") + .SourceWt(TInstant::FromValue(0)) + .Usage(TBillRecord::RequestUnits(1, TInstant::FromValue(0))) + .ToString()); + } else { + for (int i = 0; i < 10; ++i) { + env.SimulateSleep(runtime, TDuration::Seconds(1)); + } + + UNIT_ASSERT(meteringRecord.empty()); + } + } + + Y_UNIT_TEST(MeteringServerless) { + Metering(true); + } + + Y_UNIT_TEST(MeteringDedicated) { + Metering(false); + } + } // TCdcStreamWithInitialScanTests diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema index 108cac8fee5..0c51c0565c5 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema @@ -1824,11 +1824,6 @@ ], "ColumnsAdded": [ { - "ColumnId": 7, - "ColumnName": "SnapshotTxId", - "ColumnType": "Uint64" - }, - { "ColumnId": 1, "ColumnName": "TableOwnerId", "ColumnType": "Uint64" @@ -1857,19 +1852,36 @@ "ColumnId": 6, "ColumnName": "SnapshotStep", "ColumnType": "Uint64" + }, + { + "ColumnId": 7, + "ColumnName": "SnapshotTxId", + "ColumnType": "Uint64" + }, + { + "ColumnId": 8, + "ColumnName": "RowsProcessed", + "ColumnType": "Uint64" + }, + { + "ColumnId": 9, + "ColumnName": "BytesProcessed", + "ColumnType": "Uint64" } ], "ColumnsDropped": [], "ColumnFamilies": { "0": { "Columns": [ - 7, 1, 2, 3, 4, 5, - 6 + 6, + 7, + 8, + 9 ], "RoomID": 0, "Codec": 0, |