diff options
author | chertus <azuikov@ydb.tech> | 2023-02-10 10:25:06 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2023-02-10 10:25:06 +0300 |
commit | d5c8769855b8d7e7be20ccf3a617a52d80314a7d (patch) | |
tree | 137d574cbc22b0e8cc5955118fdbd48cb8ce09d5 | |
parent | dc0689f4b8d3ad768206ea71f8ad7e0bbdbf1551 (diff) | |
download | ydb-d5c8769855b8d7e7be20ccf3a617a52d80314a7d.tar.gz |
try to fix eviction counters
-rw-r--r-- | ydb/core/testlib/cs_helper.cpp | 8 | ||||
-rw-r--r-- | ydb/core/testlib/cs_helper.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/blob_manager.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/blob_manager.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__progress_tx.cpp | 23 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__propose_transaction.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__write_index.cpp | 19 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_private_events.h | 19 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.cpp | 35 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.h | 11 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/portion_info.h | 7 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/export_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/tiering/s3_actor.cpp | 12 | ||||
-rw-r--r-- | ydb/core/tx/tiering/ut/ut_tiers.cpp | 24 |
16 files changed, 103 insertions, 71 deletions
diff --git a/ydb/core/testlib/cs_helper.cpp b/ydb/core/testlib/cs_helper.cpp index 41c26f07b00..42c2969113c 100644 --- a/ydb/core/testlib/cs_helper.cpp +++ b/ydb/core/testlib/cs_helper.cpp @@ -54,10 +54,9 @@ void THelperSchemaless::CreateTestOlapTable(TActorId sender, TString storeOrDirN WaitForSchemeOperation(sender, txId); } -void THelperSchemaless::SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) { +void THelperSchemaless::SendDataViaActorSystem(TString testTable, std::shared_ptr<arrow::RecordBatch> batch) { auto* runtime = Server.GetRuntime(); - auto batch = TestArrowBatch(pathIdBegin, tsBegin, rowCount); UNIT_ASSERT(batch); UNIT_ASSERT(batch->num_rows()); auto data = NArrow::SerializeBatchNoCompression(batch); @@ -94,6 +93,11 @@ void THelperSchemaless::SendDataViaActorSystem(TString testTable, ui64 pathIdBeg runtime->DispatchEvents(options); } +void THelperSchemaless::SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) { + auto batch = TestArrowBatch(pathIdBegin, tsBegin, rowCount); + SendDataViaActorSystem(testTable, batch); +} + // std::shared_ptr<arrow::Schema> THelper::GetArrowSchema() { diff --git a/ydb/core/testlib/cs_helper.h b/ydb/core/testlib/cs_helper.h index 3ce85db2f2e..1e7738c9e16 100644 --- a/ydb/core/testlib/cs_helper.h +++ b/ydb/core/testlib/cs_helper.h @@ -15,6 +15,7 @@ public: void CreateTestOlapStore(TActorId sender, TString scheme); void CreateTestOlapTable(TActorId sender, TString storeOrDirName, TString scheme); void SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount); + void SendDataViaActorSystem(TString testTable, std::shared_ptr<arrow::RecordBatch> batch); virtual std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) = 0; }; diff --git a/ydb/core/tx/columnshard/blob_manager.cpp b/ydb/core/tx/columnshard/blob_manager.cpp index 5d67ee6e56b..14bcdcec570 100644 --- a/ydb/core/tx/columnshard/blob_manager.cpp +++ b/ydb/core/tx/columnshard/blob_manager.cpp @@ -515,7 +515,7 @@ bool TBlobManager::ExportOneToOne(const TUnifiedBlobId& blobId, const NKikimrTxC .Blob = blobId }; - if (EvictedBlobs.count(evict)) { + if (EvictedBlobs.count(evict) || DroppedEvictedBlobs.count(evict)) { return false; } @@ -693,11 +693,6 @@ void TBlobManager::SetBlobInUse(const TUnifiedBlobId& blobId, bool inUse) { NBlobCache::ForgetBlob(blobId); } -bool TBlobManager::IsEvicting(const TUnifiedBlobId& id) { - TEvictMetadata meta; - return GetEvicted(id, meta).State == EEvictState::EVICTING; -} - bool TBlobManager::ExtractEvicted(TEvictedBlob& evict, TEvictMetadata& meta, bool fromDropped /*= false*/) { if (fromDropped) { if (DroppedEvictedBlobs.count(evict)) { diff --git a/ydb/core/tx/columnshard/blob_manager.h b/ydb/core/tx/columnshard/blob_manager.h index f1bd7af5b3e..8b6405cd80a 100644 --- a/ydb/core/tx/columnshard/blob_manager.h +++ b/ydb/core/tx/columnshard/blob_manager.h @@ -226,7 +226,6 @@ public: CountersUpdate = TBlobManagerCounters(); return res; } - bool IsEvicting(const TUnifiedBlobId& id); // Implementation of IBlobManager interface TBlobBatch StartBlobBatch(ui32 channel = BLOB_CHANNEL) override; diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp index 2c4a184244a..0bacb0735f4 100644 --- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp +++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp @@ -17,6 +17,12 @@ private: { } }; + enum class ETriggerActivities { + NONE, + POST_INSERT, + POST_SCHEMA + }; + public: TTxProgressTx(TColumnShard* self) : TTransactionBase(self) @@ -76,6 +82,7 @@ public: MakeHolder<TEvColumnShard::TEvNotifyTxCompletionResult>(Self->TabletID(), txId)); } Self->AltersInFlight.erase(txId); + Trigger = ETriggerActivities::POST_SCHEMA; break; } case NKikimrTxColumnShard::TX_KIND_COMMIT: { @@ -107,7 +114,7 @@ public: } Self->CommitsInFlight.erase(txId); Self->UpdateInsertTableCounters(); - StartBackgroundActivities = true; + Trigger = ETriggerActivities::POST_INSERT; break; } default: { @@ -147,14 +154,22 @@ public: Self->Execute(Self->CreateTxRunGc(), ctx); } - if (StartBackgroundActivities) { - Self->EnqueueBackgroundActivities(false, true); + switch (Trigger) { + case ETriggerActivities::POST_INSERT: + Self->EnqueueBackgroundActivities(false, true); + break; + case ETriggerActivities::POST_SCHEMA: + Self->EnqueueBackgroundActivities(); + break; + case ETriggerActivities::NONE: + default: + break; } } private: TVector<TEvent> TxEvents; - bool StartBackgroundActivities{false}; + ETriggerActivities Trigger{ETriggerActivities::NONE}; }; void TColumnShard::EnqueueProgressTx(const TActorContext& ctx) { diff --git a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp index 5fa3c2c49fe..7fe1168fe2e 100644 --- a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp +++ b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp @@ -243,7 +243,7 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex if (statusMessage.empty()) { if (auto event = Self->SetupTtl(pathTtls, true)) { - if (event->NeedWrites()) { + if (event->NeedDataReadWrite()) { ctx.Send(Self->EvictionActor, event.release()); } else { ctx.Send(Self->SelfId(), event->TxEvent.release()); diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp index 37824884410..3cc6d103223 100644 --- a/ydb/core/tx/columnshard/columnshard__write_index.cpp +++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp @@ -24,7 +24,7 @@ public: private: struct TPathIdBlobs { - THashSet<TUnifiedBlobId> Blobs; + THashMap<TUnifiedBlobId, TString> Blobs; ui64 PathId; TPathIdBlobs(const ui64 pathId) : PathId(pathId) { @@ -213,7 +213,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) Self->UpdateIndexCounters(); } else { - LOG_S_INFO("TTxWriteIndex (" << changes->TypeString() + LOG_S_NOTICE("TTxWriteIndex (" << changes->TypeString() << ") cannot apply changes: " << *changes << " at tablet " << Self->TabletID()); // TODO: delayed insert @@ -230,7 +230,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) if (it == ExportTierBlobs.end()) { it = ExportTierBlobs.emplace(evFeatures.TargetTierName, TPathIdBlobs(evFeatures.PathId)).first; } - it->second.Blobs.emplace(blobId); + it->second.Blobs.emplace(blobId, TString()); } blobsToExport.clear(); @@ -297,17 +297,12 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) { Self->EnqueueBackgroundActivities(); } - for (auto& [tierName, blobIds] : ExportTierBlobs) { + for (auto& [tierName, pathBlobs] : ExportTierBlobs) { Y_VERIFY(ExportNo); + Y_VERIFY(pathBlobs.PathId); - TEvPrivate::TEvExport::TBlobDataMap blobsData; - for (auto&& i : blobIds.Blobs) { - TEvPrivate::TEvExport::TExportBlobInfo info(blobIds.PathId); - info.Evicting = Self->BlobManager->IsEvicting(i); - blobsData.emplace(i, std::move(info)); - } - - ctx.Send(Self->SelfId(), new TEvPrivate::TEvExport(ExportNo, tierName, std::move(blobsData))); + ctx.Send(Self->SelfId(), + new TEvPrivate::TEvExport(ExportNo, tierName, pathBlobs.PathId, std::move(pathBlobs.Blobs))); ++ExportNo; } diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 28152722828..7cb866d4e73 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -715,7 +715,7 @@ void TColumnShard::EnqueueBackgroundActivities(bool periodic, bool insertOnly) { } if (auto event = SetupTtl()) { - if (event->NeedWrites()) { + if (event->NeedDataReadWrite()) { ctx.Send(EvictionActor, event.release()); } else { ctx.Send(SelfId(), event->TxEvent.release()); diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h index 380e1dd4ffd..b91ab112fe7 100644 --- a/ydb/core/tx/columnshard/columnshard_private_events.h +++ b/ydb/core/tx/columnshard/columnshard_private_events.h @@ -110,7 +110,7 @@ struct TEvPrivate { } } - bool NeedWrites() const { + bool NeedDataReadWrite() const { return (TxEvent->PutStatus != NKikimrProto::OK); } }; @@ -124,33 +124,26 @@ struct TEvPrivate { }; struct TEvExport : public TEventLocal<TEvExport, EvExport> { - struct TExportBlobInfo { - const ui64 PathId = 0; - TString Data; - bool Evicting = false; - TExportBlobInfo(const ui64 pathId) - : PathId(pathId) - { - - } - }; - using TBlobDataMap = THashMap<TUnifiedBlobId, TExportBlobInfo>; + using TBlobDataMap = THashMap<TUnifiedBlobId, TString>; NKikimrProto::EReplyStatus Status = NKikimrProto::UNKNOWN; ui64 ExportNo = 0; TString TierName; + ui64 PathId = 0; TActorId DstActor; TBlobDataMap Blobs; THashMap<TUnifiedBlobId, TUnifiedBlobId> SrcToDstBlobs; TMap<TString, TString> ErrorStrings; - explicit TEvExport(ui64 exportNo, const TString& tierName, TBlobDataMap&& tierBlobs) + explicit TEvExport(ui64 exportNo, const TString& tierName, ui64 pathId, TBlobDataMap&& tierBlobs) : ExportNo(exportNo) , TierName(tierName) + , PathId(pathId) , Blobs(std::move(tierBlobs)) { Y_VERIFY(ExportNo); Y_VERIFY(!TierName.empty()); + Y_VERIFY(PathId); Y_VERIFY(!Blobs.empty()); } diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index f7e9260dfe3..bec3ceb01ca 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -17,6 +17,7 @@ struct TPredicate; struct TCompactionLimits { static constexpr const ui32 MIN_GOOD_BLOB_SIZE = 256 * 1024; // some BlobStorage constant static constexpr const ui32 MAX_BLOB_SIZE = 8 * 1024 * 1024; // some BlobStorage constant + static constexpr const ui64 EVICT_HOT_PORTION_BYTES = 1 * 1024 * 1024; static constexpr const ui64 DEFAULT_EVICTION_BYTES = 64 * 1024 * 1024; static constexpr const ui64 MAX_BLOBS_TO_DELETE = 10000; diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index a53c9f92b8c..e843d4b204f 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -440,8 +440,8 @@ const TColumnEngineStats& TColumnEngineForLogs::GetTotalStats() { return Counters; } -void TColumnEngineForLogs::UpdatePortionStats(const TPortionInfo& portionInfo, bool isErase, bool isLoad) { - UpdatePortionStats(Counters, portionInfo, isErase, isLoad); +void TColumnEngineForLogs::UpdatePortionStats(const TPortionInfo& portionInfo, EStatsUpdateType updateType) { + UpdatePortionStats(Counters, portionInfo, updateType); ui64 granule = portionInfo.Granule(); Y_VERIFY(granule); @@ -453,11 +453,11 @@ void TColumnEngineForLogs::UpdatePortionStats(const TPortionInfo& portionInfo, b stats = std::make_shared<TColumnEngineStats>(); stats->Tables = 1; } - UpdatePortionStats(*PathStats[pathId], portionInfo, isErase, isLoad); + UpdatePortionStats(*PathStats[pathId], portionInfo, updateType); } void TColumnEngineForLogs::UpdatePortionStats(TColumnEngineStats& engineStats, const TPortionInfo& portionInfo, - bool isErase, bool isLoad) const { + EStatsUpdateType updateType) const { ui64 columnRecords = portionInfo.Records.size(); ui64 metadataBytes = 0; THashSet<TUnifiedBlobId> blobs; @@ -494,7 +494,13 @@ void TColumnEngineForLogs::UpdatePortionStats(TColumnEngineStats& engineStats, c break; } Y_VERIFY(srcStats); - auto* stats = portionInfo.IsActive() ? srcStats : &engineStats.Inactive; + auto* stats = (updateType == EStatsUpdateType::EVICT) + ? &engineStats.Evicted + : (portionInfo.IsActive() ? srcStats : &engineStats.Inactive); + + bool isErase = updateType == EStatsUpdateType::ERASE; + bool isLoad = updateType == EStatsUpdateType::LOAD; + bool isAppended = portionInfo.IsActive() && (updateType != EStatsUpdateType::EVICT); if (isErase) { // PortionsToDrop engineStats.ColumnRecords -= columnRecords; @@ -505,7 +511,7 @@ void TColumnEngineForLogs::UpdatePortionStats(TColumnEngineStats& engineStats, c stats->Rows -= rows; stats->Bytes -= bytes; stats->RawBytes -= rawBytes; - } else if (isLoad || portionInfo.IsActive()) { // AppendedPortions + } else if (isLoad || isAppended) { // AppendedPortions engineStats.ColumnRecords += columnRecords; engineStats.ColumnMetadataBytes += metadataBytes; @@ -514,7 +520,7 @@ void TColumnEngineForLogs::UpdatePortionStats(TColumnEngineStats& engineStats, c stats->Rows += rows; stats->Bytes += bytes; stats->RawBytes += rawBytes; - } else { // SwitchedPortions + } else { // SwitchedPortions || PortionsToEvict --srcStats->Portions; srcStats->Blobs -= blobs.size(); srcStats->Rows -= rows; @@ -576,7 +582,7 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db, const THashSet<ui64>& pathsToDro CleanupGranules.insert(granule); } for (auto& [_, portionInfo] : spg->Portions) { - UpdatePortionStats(portionInfo, false, true); + UpdatePortionStats(portionInfo, EStatsUpdateType::LOAD); } } @@ -823,11 +829,13 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash allowEviction = (evicttionSize <= maxEvictBytes); allowDrop = (dropBlobs <= TCompactionLimits::MAX_BLOBS_TO_DELETE); + bool tryEvictPortion = allowEviction && ttl.HasTiers() + && info.EvictReady(TCompactionLimits::EVICT_HOT_PORTION_BYTES); if (auto max = info.MaxValue(ttlColumnId)) { bool keep = NArrow::ScalarLess(expireTimestamp, max); - if (keep && allowEviction && ttl.HasTiers()) { + if (keep && tryEvictPortion) { TString tierName; for (auto& tierRef : ttl.OrderedTiers) { // TODO: lower/upper_bound + move into TEviction auto& tierInfo = tierRef.Get(); @@ -1116,7 +1124,10 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes, } Y_VERIFY(portionInfo.TierName != oldInfo.TierName); - // TODO: update stats + if (apply) { + UpdatePortionStats(oldInfo, EStatsUpdateType::EVICT); + } + if (!UpsertPortion(portionInfo, apply, false)) { LOG_S_ERROR("Cannot evict portion " << portionInfo << " at tablet " << TabletId); return false; @@ -1258,7 +1269,6 @@ void TColumnEngineForLogs::EraseGranule(ui64 pathId, ui64 granule, const TMark& } bool TColumnEngineForLogs::UpsertPortion(const TPortionInfo& portionInfo, bool apply, bool updateStats) { - Y_VERIFY(portionInfo.Valid()); ui64 granule = portionInfo.Granule(); if (!apply) { @@ -1270,6 +1280,7 @@ bool TColumnEngineForLogs::UpsertPortion(const TPortionInfo& portionInfo, bool a return true; } + Y_VERIFY(portionInfo.Valid()); ui64 portion = portionInfo.Portion(); auto& spg = Granules[granule]; Y_VERIFY(spg); @@ -1296,7 +1307,7 @@ bool TColumnEngineForLogs::ErasePortion(const TPortionInfo& portionInfo, bool ap Y_VERIFY(spg); if (spg->Portions.count(portion)) { if (updateStats) { - UpdatePortionStats(spg->Portions[portion], true); + UpdatePortionStats(spg->Portions[portion], EStatsUpdateType::ERASE); } spg->Portions.erase(portion); } else { diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index 7e328c4050b..302e3ba99f0 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -215,6 +215,13 @@ public: LAST_TX_ID, }; + enum class EStatsUpdateType { + DEFAULT = 0, + ERASE, + LOAD, + EVICT + }; + TColumnEngineForLogs(TIndexInfo&& info, ui64 tabletId, const TCompactionLimits& limits = {}); const TIndexInfo& GetIndexInfo() const override { return IndexInfo; } @@ -343,9 +350,9 @@ private: bool UpsertPortion(const TPortionInfo& portionInfo, bool apply, bool updateStats = true); bool ErasePortion(const TPortionInfo& portionInfo, bool apply, bool updateStats = true); void AddColumnRecord(const TColumnRecord& row); - void UpdatePortionStats(const TPortionInfo& portionInfo, bool isErase = false, bool isLoad = false); + void UpdatePortionStats(const TPortionInfo& portionInfo, EStatsUpdateType updateType = EStatsUpdateType::DEFAULT); void UpdatePortionStats(TColumnEngineStats& engineStats, const TPortionInfo& portionInfo, - bool isErase = false, bool isLoad = false) const; + EStatsUpdateType updateType) const; bool CanInsert(const TChanges& changes, const TSnapshot& commitSnap) const; TMap<TSnapshot, TVector<ui64>> GetOrderedPortions(ui64 granule, const TSnapshot& snapshot = TSnapshot::Max()) const; diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h index dcb45573c53..2374bc3dc6a 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portion_info.h @@ -56,6 +56,13 @@ struct TPortionInfo { bool CanHaveDups() const { return !Valid() || IsInserted(); } ui32 NumRecords() const { return Records.size(); } + bool EvictReady(size_t hotSize) const { + return Meta.Produced == TPortionMeta::COMPACTED + || Meta.Produced == TPortionMeta::SPLIT_COMPACTED + || Meta.Produced == TPortionMeta::EVICTED + || (Meta.Produced == TPortionMeta::INSERTED && BlobsSizes().first >= hotSize); + } + ui64 Portion() const { Y_VERIFY(!Empty()); auto& rec = Records[0]; diff --git a/ydb/core/tx/columnshard/export_actor.cpp b/ydb/core/tx/columnshard/export_actor.cpp index 207c5b0b295..5b9cfca6bbb 100644 --- a/ydb/core/tx/columnshard/export_actor.cpp +++ b/ydb/core/tx/columnshard/export_actor.cpp @@ -52,7 +52,7 @@ public: { auto it = Event->Blobs.find(blobId); Y_VERIFY(it != Event->Blobs.end()); - it->second.Data = blobData; + it->second = blobData; } if (BlobsToRead.empty()) { diff --git a/ydb/core/tx/tiering/s3_actor.cpp b/ydb/core/tx/tiering/s3_actor.cpp index abe1eb28a5f..b0893c72fa8 100644 --- a/ydb/core/tx/tiering/s3_actor.cpp +++ b/ydb/core/tx/tiering/s3_actor.cpp @@ -116,24 +116,20 @@ public: void Handle(TEvPrivate::TEvExport::TPtr& ev) { auto& msg = *ev->Get(); ui64 exportNo = msg.ExportNo; - Y_VERIFY(ev->Get()->DstActor == ShardActor); + Y_VERIFY(msg.DstActor == ShardActor); Y_VERIFY(!Exports.count(exportNo)); Exports[exportNo] = TS3Export(ev->Release()); auto& ex = Exports[exportNo]; - for (auto& [blobId, blob] : ex.Blobs()) { - TString key = ex.AddExported(blobId, blob.PathId).GetS3Key(); + for (auto& [blobId, blobData] : ex.Blobs()) { + TString key = ex.AddExported(blobId, msg.PathId).GetS3Key(); Y_VERIFY(!ExportingKeys.count(key)); // TODO ex.RegisterKey(key); ExportingKeys[key] = exportNo; - if (blob.Evicting) { - SendPutObjectIfNotExists(key, std::move(blob.Data)); - } else { - SendPutObject(key, std::move(blob.Data)); - } + SendPutObjectIfNotExists(key, std::move(blobData)); } } diff --git a/ydb/core/tx/tiering/ut/ut_tiers.cpp b/ydb/core/tx/tiering/ut/ut_tiers.cpp index f6e592e1e8c..b6e8e77242c 100644 --- a/ydb/core/tx/tiering/ut/ut_tiers.cpp +++ b/ydb/core/tx/tiering/ut/ut_tiers.cpp @@ -29,8 +29,8 @@ private: using TBase = Tests::NCS::THelper; public: using TBase::TBase; - void CreateTestOlapTable(TString tableName = "olapTable", TString storeName = "olapStore", - ui32 storeShardsCount = 4, ui32 tableShardsCount = 3, + void CreateTestOlapTable(TString tableName = "olapTable", ui32 tableShardsCount = 3, + TString storeName = "olapStore", ui32 storeShardsCount = 4, TString shardingFunction = "HASH_FUNCTION_CLOUD_LOGS") { TActorId sender = Server.GetRuntime()->AllocateEdgeActor(); CreateTestOlapStore(sender, Sprintf(R"( @@ -489,14 +489,22 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { emulator->SetExpectedTiersCount(2); emulator->CheckRuntime(runtime); } - lHelper.CreateTestOlapTable("olapTable"); + lHelper.CreateTestOlapTable("olapTable", 2); Cerr << "Wait tables" << Endl; runtime.SimulateSleep(TDuration::Seconds(20)); Cerr << "Initialization tables" << Endl; - Cerr << "Insert..." << Endl; const TInstant pkStart = Now() - TDuration::Days(15); ui32 idx = 0; - lHelper.SendDataViaActorSystem("/Root/olapStore/olapTable", 0, (pkStart + TDuration::Seconds(2 * idx++)).GetValue(), 2000); + + auto batch = lHelper.TestArrowBatch(0, (pkStart + TDuration::Seconds(2 * idx++)).GetValue(), 6000); + auto batchSize = NArrow::GetBatchDataSize(batch); + Cerr << "Inserting " << batchSize << " bytes..." << Endl; + UNIT_ASSERT(batchSize > 4 * 1024 * 1024); // NColumnShard::TLimits::MIN_BYTES_TO_INSERT + UNIT_ASSERT(batchSize < 8 * 1024 * 1024); + + for (ui32 i = 0; i < 4; ++i) { + lHelper.SendDataViaActorSystem("/Root/olapStore/olapTable", batch); + } { const TInstant start = Now(); bool check = false; @@ -513,7 +521,7 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { #endif runtime.SimulateSleep(TDuration::Seconds(1)); } - Y_VERIFY(check); + UNIT_ASSERT(check); } #ifdef S3_TEST_USAGE Cerr << "storage initialized..." << Endl; @@ -536,10 +544,10 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { #endif runtime.SimulateSleep(TDuration::Seconds(1)); } - Y_VERIFY(check); + UNIT_ASSERT(check); } #ifndef S3_TEST_USAGE - Y_VERIFY(Singleton<NKikimr::NWrappers::NExternalStorage::TFakeExternalStorage>()->GetBucketsCount() == 1); + UNIT_ASSERT_EQUAL(Singleton<NKikimr::NWrappers::NExternalStorage::TFakeExternalStorage>()->GetBucketsCount(), 1); #endif } |