aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2023-02-10 10:25:06 +0300
committerchertus <azuikov@ydb.tech>2023-02-10 10:25:06 +0300
commitd5c8769855b8d7e7be20ccf3a617a52d80314a7d (patch)
tree137d574cbc22b0e8cc5955118fdbd48cb8ce09d5
parentdc0689f4b8d3ad768206ea71f8ad7e0bbdbf1551 (diff)
downloadydb-d5c8769855b8d7e7be20ccf3a617a52d80314a7d.tar.gz
try to fix eviction counters
-rw-r--r--ydb/core/testlib/cs_helper.cpp8
-rw-r--r--ydb/core/testlib/cs_helper.h1
-rw-r--r--ydb/core/tx/columnshard/blob_manager.cpp7
-rw-r--r--ydb/core/tx/columnshard/blob_manager.h1
-rw-r--r--ydb/core/tx/columnshard/columnshard__progress_tx.cpp23
-rw-r--r--ydb/core/tx/columnshard/columnshard__propose_transaction.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard__write_index.cpp19
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard_private_events.h19
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h1
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp35
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h11
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.h7
-rw-r--r--ydb/core/tx/columnshard/export_actor.cpp2
-rw-r--r--ydb/core/tx/tiering/s3_actor.cpp12
-rw-r--r--ydb/core/tx/tiering/ut/ut_tiers.cpp24
16 files changed, 103 insertions, 71 deletions
diff --git a/ydb/core/testlib/cs_helper.cpp b/ydb/core/testlib/cs_helper.cpp
index 41c26f07b00..42c2969113c 100644
--- a/ydb/core/testlib/cs_helper.cpp
+++ b/ydb/core/testlib/cs_helper.cpp
@@ -54,10 +54,9 @@ void THelperSchemaless::CreateTestOlapTable(TActorId sender, TString storeOrDirN
WaitForSchemeOperation(sender, txId);
}
-void THelperSchemaless::SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) {
+void THelperSchemaless::SendDataViaActorSystem(TString testTable, std::shared_ptr<arrow::RecordBatch> batch) {
auto* runtime = Server.GetRuntime();
- auto batch = TestArrowBatch(pathIdBegin, tsBegin, rowCount);
UNIT_ASSERT(batch);
UNIT_ASSERT(batch->num_rows());
auto data = NArrow::SerializeBatchNoCompression(batch);
@@ -94,6 +93,11 @@ void THelperSchemaless::SendDataViaActorSystem(TString testTable, ui64 pathIdBeg
runtime->DispatchEvents(options);
}
+void THelperSchemaless::SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) {
+ auto batch = TestArrowBatch(pathIdBegin, tsBegin, rowCount);
+ SendDataViaActorSystem(testTable, batch);
+}
+
//
std::shared_ptr<arrow::Schema> THelper::GetArrowSchema() {
diff --git a/ydb/core/testlib/cs_helper.h b/ydb/core/testlib/cs_helper.h
index 3ce85db2f2e..1e7738c9e16 100644
--- a/ydb/core/testlib/cs_helper.h
+++ b/ydb/core/testlib/cs_helper.h
@@ -15,6 +15,7 @@ public:
void CreateTestOlapStore(TActorId sender, TString scheme);
void CreateTestOlapTable(TActorId sender, TString storeOrDirName, TString scheme);
void SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount);
+ void SendDataViaActorSystem(TString testTable, std::shared_ptr<arrow::RecordBatch> batch);
virtual std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) = 0;
};
diff --git a/ydb/core/tx/columnshard/blob_manager.cpp b/ydb/core/tx/columnshard/blob_manager.cpp
index 5d67ee6e56b..14bcdcec570 100644
--- a/ydb/core/tx/columnshard/blob_manager.cpp
+++ b/ydb/core/tx/columnshard/blob_manager.cpp
@@ -515,7 +515,7 @@ bool TBlobManager::ExportOneToOne(const TUnifiedBlobId& blobId, const NKikimrTxC
.Blob = blobId
};
- if (EvictedBlobs.count(evict)) {
+ if (EvictedBlobs.count(evict) || DroppedEvictedBlobs.count(evict)) {
return false;
}
@@ -693,11 +693,6 @@ void TBlobManager::SetBlobInUse(const TUnifiedBlobId& blobId, bool inUse) {
NBlobCache::ForgetBlob(blobId);
}
-bool TBlobManager::IsEvicting(const TUnifiedBlobId& id) {
- TEvictMetadata meta;
- return GetEvicted(id, meta).State == EEvictState::EVICTING;
-}
-
bool TBlobManager::ExtractEvicted(TEvictedBlob& evict, TEvictMetadata& meta, bool fromDropped /*= false*/) {
if (fromDropped) {
if (DroppedEvictedBlobs.count(evict)) {
diff --git a/ydb/core/tx/columnshard/blob_manager.h b/ydb/core/tx/columnshard/blob_manager.h
index f1bd7af5b3e..8b6405cd80a 100644
--- a/ydb/core/tx/columnshard/blob_manager.h
+++ b/ydb/core/tx/columnshard/blob_manager.h
@@ -226,7 +226,6 @@ public:
CountersUpdate = TBlobManagerCounters();
return res;
}
- bool IsEvicting(const TUnifiedBlobId& id);
// Implementation of IBlobManager interface
TBlobBatch StartBlobBatch(ui32 channel = BLOB_CHANNEL) override;
diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
index 2c4a184244a..0bacb0735f4 100644
--- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
+++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
@@ -17,6 +17,12 @@ private:
{ }
};
+ enum class ETriggerActivities {
+ NONE,
+ POST_INSERT,
+ POST_SCHEMA
+ };
+
public:
TTxProgressTx(TColumnShard* self)
: TTransactionBase(self)
@@ -76,6 +82,7 @@ public:
MakeHolder<TEvColumnShard::TEvNotifyTxCompletionResult>(Self->TabletID(), txId));
}
Self->AltersInFlight.erase(txId);
+ Trigger = ETriggerActivities::POST_SCHEMA;
break;
}
case NKikimrTxColumnShard::TX_KIND_COMMIT: {
@@ -107,7 +114,7 @@ public:
}
Self->CommitsInFlight.erase(txId);
Self->UpdateInsertTableCounters();
- StartBackgroundActivities = true;
+ Trigger = ETriggerActivities::POST_INSERT;
break;
}
default: {
@@ -147,14 +154,22 @@ public:
Self->Execute(Self->CreateTxRunGc(), ctx);
}
- if (StartBackgroundActivities) {
- Self->EnqueueBackgroundActivities(false, true);
+ switch (Trigger) {
+ case ETriggerActivities::POST_INSERT:
+ Self->EnqueueBackgroundActivities(false, true);
+ break;
+ case ETriggerActivities::POST_SCHEMA:
+ Self->EnqueueBackgroundActivities();
+ break;
+ case ETriggerActivities::NONE:
+ default:
+ break;
}
}
private:
TVector<TEvent> TxEvents;
- bool StartBackgroundActivities{false};
+ ETriggerActivities Trigger{ETriggerActivities::NONE};
};
void TColumnShard::EnqueueProgressTx(const TActorContext& ctx) {
diff --git a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
index 5fa3c2c49fe..7fe1168fe2e 100644
--- a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
+++ b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
@@ -243,7 +243,7 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex
if (statusMessage.empty()) {
if (auto event = Self->SetupTtl(pathTtls, true)) {
- if (event->NeedWrites()) {
+ if (event->NeedDataReadWrite()) {
ctx.Send(Self->EvictionActor, event.release());
} else {
ctx.Send(Self->SelfId(), event->TxEvent.release());
diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp
index 37824884410..3cc6d103223 100644
--- a/ydb/core/tx/columnshard/columnshard__write_index.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp
@@ -24,7 +24,7 @@ public:
private:
struct TPathIdBlobs {
- THashSet<TUnifiedBlobId> Blobs;
+ THashMap<TUnifiedBlobId, TString> Blobs;
ui64 PathId;
TPathIdBlobs(const ui64 pathId)
: PathId(pathId) {
@@ -213,7 +213,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
Self->UpdateIndexCounters();
} else {
- LOG_S_INFO("TTxWriteIndex (" << changes->TypeString()
+ LOG_S_NOTICE("TTxWriteIndex (" << changes->TypeString()
<< ") cannot apply changes: " << *changes << " at tablet " << Self->TabletID());
// TODO: delayed insert
@@ -230,7 +230,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
if (it == ExportTierBlobs.end()) {
it = ExportTierBlobs.emplace(evFeatures.TargetTierName, TPathIdBlobs(evFeatures.PathId)).first;
}
- it->second.Blobs.emplace(blobId);
+ it->second.Blobs.emplace(blobId, TString());
}
blobsToExport.clear();
@@ -297,17 +297,12 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) {
Self->EnqueueBackgroundActivities();
}
- for (auto& [tierName, blobIds] : ExportTierBlobs) {
+ for (auto& [tierName, pathBlobs] : ExportTierBlobs) {
Y_VERIFY(ExportNo);
+ Y_VERIFY(pathBlobs.PathId);
- TEvPrivate::TEvExport::TBlobDataMap blobsData;
- for (auto&& i : blobIds.Blobs) {
- TEvPrivate::TEvExport::TExportBlobInfo info(blobIds.PathId);
- info.Evicting = Self->BlobManager->IsEvicting(i);
- blobsData.emplace(i, std::move(info));
- }
-
- ctx.Send(Self->SelfId(), new TEvPrivate::TEvExport(ExportNo, tierName, std::move(blobsData)));
+ ctx.Send(Self->SelfId(),
+ new TEvPrivate::TEvExport(ExportNo, tierName, pathBlobs.PathId, std::move(pathBlobs.Blobs)));
++ExportNo;
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 28152722828..7cb866d4e73 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -715,7 +715,7 @@ void TColumnShard::EnqueueBackgroundActivities(bool periodic, bool insertOnly) {
}
if (auto event = SetupTtl()) {
- if (event->NeedWrites()) {
+ if (event->NeedDataReadWrite()) {
ctx.Send(EvictionActor, event.release());
} else {
ctx.Send(SelfId(), event->TxEvent.release());
diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h
index 380e1dd4ffd..b91ab112fe7 100644
--- a/ydb/core/tx/columnshard/columnshard_private_events.h
+++ b/ydb/core/tx/columnshard/columnshard_private_events.h
@@ -110,7 +110,7 @@ struct TEvPrivate {
}
}
- bool NeedWrites() const {
+ bool NeedDataReadWrite() const {
return (TxEvent->PutStatus != NKikimrProto::OK);
}
};
@@ -124,33 +124,26 @@ struct TEvPrivate {
};
struct TEvExport : public TEventLocal<TEvExport, EvExport> {
- struct TExportBlobInfo {
- const ui64 PathId = 0;
- TString Data;
- bool Evicting = false;
- TExportBlobInfo(const ui64 pathId)
- : PathId(pathId)
- {
-
- }
- };
- using TBlobDataMap = THashMap<TUnifiedBlobId, TExportBlobInfo>;
+ using TBlobDataMap = THashMap<TUnifiedBlobId, TString>;
NKikimrProto::EReplyStatus Status = NKikimrProto::UNKNOWN;
ui64 ExportNo = 0;
TString TierName;
+ ui64 PathId = 0;
TActorId DstActor;
TBlobDataMap Blobs;
THashMap<TUnifiedBlobId, TUnifiedBlobId> SrcToDstBlobs;
TMap<TString, TString> ErrorStrings;
- explicit TEvExport(ui64 exportNo, const TString& tierName, TBlobDataMap&& tierBlobs)
+ explicit TEvExport(ui64 exportNo, const TString& tierName, ui64 pathId, TBlobDataMap&& tierBlobs)
: ExportNo(exportNo)
, TierName(tierName)
+ , PathId(pathId)
, Blobs(std::move(tierBlobs))
{
Y_VERIFY(ExportNo);
Y_VERIFY(!TierName.empty());
+ Y_VERIFY(PathId);
Y_VERIFY(!Blobs.empty());
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index f7e9260dfe3..bec3ceb01ca 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -17,6 +17,7 @@ struct TPredicate;
struct TCompactionLimits {
static constexpr const ui32 MIN_GOOD_BLOB_SIZE = 256 * 1024; // some BlobStorage constant
static constexpr const ui32 MAX_BLOB_SIZE = 8 * 1024 * 1024; // some BlobStorage constant
+ static constexpr const ui64 EVICT_HOT_PORTION_BYTES = 1 * 1024 * 1024;
static constexpr const ui64 DEFAULT_EVICTION_BYTES = 64 * 1024 * 1024;
static constexpr const ui64 MAX_BLOBS_TO_DELETE = 10000;
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index a53c9f92b8c..e843d4b204f 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -440,8 +440,8 @@ const TColumnEngineStats& TColumnEngineForLogs::GetTotalStats() {
return Counters;
}
-void TColumnEngineForLogs::UpdatePortionStats(const TPortionInfo& portionInfo, bool isErase, bool isLoad) {
- UpdatePortionStats(Counters, portionInfo, isErase, isLoad);
+void TColumnEngineForLogs::UpdatePortionStats(const TPortionInfo& portionInfo, EStatsUpdateType updateType) {
+ UpdatePortionStats(Counters, portionInfo, updateType);
ui64 granule = portionInfo.Granule();
Y_VERIFY(granule);
@@ -453,11 +453,11 @@ void TColumnEngineForLogs::UpdatePortionStats(const TPortionInfo& portionInfo, b
stats = std::make_shared<TColumnEngineStats>();
stats->Tables = 1;
}
- UpdatePortionStats(*PathStats[pathId], portionInfo, isErase, isLoad);
+ UpdatePortionStats(*PathStats[pathId], portionInfo, updateType);
}
void TColumnEngineForLogs::UpdatePortionStats(TColumnEngineStats& engineStats, const TPortionInfo& portionInfo,
- bool isErase, bool isLoad) const {
+ EStatsUpdateType updateType) const {
ui64 columnRecords = portionInfo.Records.size();
ui64 metadataBytes = 0;
THashSet<TUnifiedBlobId> blobs;
@@ -494,7 +494,13 @@ void TColumnEngineForLogs::UpdatePortionStats(TColumnEngineStats& engineStats, c
break;
}
Y_VERIFY(srcStats);
- auto* stats = portionInfo.IsActive() ? srcStats : &engineStats.Inactive;
+ auto* stats = (updateType == EStatsUpdateType::EVICT)
+ ? &engineStats.Evicted
+ : (portionInfo.IsActive() ? srcStats : &engineStats.Inactive);
+
+ bool isErase = updateType == EStatsUpdateType::ERASE;
+ bool isLoad = updateType == EStatsUpdateType::LOAD;
+ bool isAppended = portionInfo.IsActive() && (updateType != EStatsUpdateType::EVICT);
if (isErase) { // PortionsToDrop
engineStats.ColumnRecords -= columnRecords;
@@ -505,7 +511,7 @@ void TColumnEngineForLogs::UpdatePortionStats(TColumnEngineStats& engineStats, c
stats->Rows -= rows;
stats->Bytes -= bytes;
stats->RawBytes -= rawBytes;
- } else if (isLoad || portionInfo.IsActive()) { // AppendedPortions
+ } else if (isLoad || isAppended) { // AppendedPortions
engineStats.ColumnRecords += columnRecords;
engineStats.ColumnMetadataBytes += metadataBytes;
@@ -514,7 +520,7 @@ void TColumnEngineForLogs::UpdatePortionStats(TColumnEngineStats& engineStats, c
stats->Rows += rows;
stats->Bytes += bytes;
stats->RawBytes += rawBytes;
- } else { // SwitchedPortions
+ } else { // SwitchedPortions || PortionsToEvict
--srcStats->Portions;
srcStats->Blobs -= blobs.size();
srcStats->Rows -= rows;
@@ -576,7 +582,7 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db, const THashSet<ui64>& pathsToDro
CleanupGranules.insert(granule);
}
for (auto& [_, portionInfo] : spg->Portions) {
- UpdatePortionStats(portionInfo, false, true);
+ UpdatePortionStats(portionInfo, EStatsUpdateType::LOAD);
}
}
@@ -823,11 +829,13 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash
allowEviction = (evicttionSize <= maxEvictBytes);
allowDrop = (dropBlobs <= TCompactionLimits::MAX_BLOBS_TO_DELETE);
+ bool tryEvictPortion = allowEviction && ttl.HasTiers()
+ && info.EvictReady(TCompactionLimits::EVICT_HOT_PORTION_BYTES);
if (auto max = info.MaxValue(ttlColumnId)) {
bool keep = NArrow::ScalarLess(expireTimestamp, max);
- if (keep && allowEviction && ttl.HasTiers()) {
+ if (keep && tryEvictPortion) {
TString tierName;
for (auto& tierRef : ttl.OrderedTiers) { // TODO: lower/upper_bound + move into TEviction
auto& tierInfo = tierRef.Get();
@@ -1116,7 +1124,10 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes,
}
Y_VERIFY(portionInfo.TierName != oldInfo.TierName);
- // TODO: update stats
+ if (apply) {
+ UpdatePortionStats(oldInfo, EStatsUpdateType::EVICT);
+ }
+
if (!UpsertPortion(portionInfo, apply, false)) {
LOG_S_ERROR("Cannot evict portion " << portionInfo << " at tablet " << TabletId);
return false;
@@ -1258,7 +1269,6 @@ void TColumnEngineForLogs::EraseGranule(ui64 pathId, ui64 granule, const TMark&
}
bool TColumnEngineForLogs::UpsertPortion(const TPortionInfo& portionInfo, bool apply, bool updateStats) {
- Y_VERIFY(portionInfo.Valid());
ui64 granule = portionInfo.Granule();
if (!apply) {
@@ -1270,6 +1280,7 @@ bool TColumnEngineForLogs::UpsertPortion(const TPortionInfo& portionInfo, bool a
return true;
}
+ Y_VERIFY(portionInfo.Valid());
ui64 portion = portionInfo.Portion();
auto& spg = Granules[granule];
Y_VERIFY(spg);
@@ -1296,7 +1307,7 @@ bool TColumnEngineForLogs::ErasePortion(const TPortionInfo& portionInfo, bool ap
Y_VERIFY(spg);
if (spg->Portions.count(portion)) {
if (updateStats) {
- UpdatePortionStats(spg->Portions[portion], true);
+ UpdatePortionStats(spg->Portions[portion], EStatsUpdateType::ERASE);
}
spg->Portions.erase(portion);
} else {
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index 7e328c4050b..302e3ba99f0 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -215,6 +215,13 @@ public:
LAST_TX_ID,
};
+ enum class EStatsUpdateType {
+ DEFAULT = 0,
+ ERASE,
+ LOAD,
+ EVICT
+ };
+
TColumnEngineForLogs(TIndexInfo&& info, ui64 tabletId, const TCompactionLimits& limits = {});
const TIndexInfo& GetIndexInfo() const override { return IndexInfo; }
@@ -343,9 +350,9 @@ private:
bool UpsertPortion(const TPortionInfo& portionInfo, bool apply, bool updateStats = true);
bool ErasePortion(const TPortionInfo& portionInfo, bool apply, bool updateStats = true);
void AddColumnRecord(const TColumnRecord& row);
- void UpdatePortionStats(const TPortionInfo& portionInfo, bool isErase = false, bool isLoad = false);
+ void UpdatePortionStats(const TPortionInfo& portionInfo, EStatsUpdateType updateType = EStatsUpdateType::DEFAULT);
void UpdatePortionStats(TColumnEngineStats& engineStats, const TPortionInfo& portionInfo,
- bool isErase = false, bool isLoad = false) const;
+ EStatsUpdateType updateType) const;
bool CanInsert(const TChanges& changes, const TSnapshot& commitSnap) const;
TMap<TSnapshot, TVector<ui64>> GetOrderedPortions(ui64 granule, const TSnapshot& snapshot = TSnapshot::Max()) const;
diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h
index dcb45573c53..2374bc3dc6a 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portion_info.h
@@ -56,6 +56,13 @@ struct TPortionInfo {
bool CanHaveDups() const { return !Valid() || IsInserted(); }
ui32 NumRecords() const { return Records.size(); }
+ bool EvictReady(size_t hotSize) const {
+ return Meta.Produced == TPortionMeta::COMPACTED
+ || Meta.Produced == TPortionMeta::SPLIT_COMPACTED
+ || Meta.Produced == TPortionMeta::EVICTED
+ || (Meta.Produced == TPortionMeta::INSERTED && BlobsSizes().first >= hotSize);
+ }
+
ui64 Portion() const {
Y_VERIFY(!Empty());
auto& rec = Records[0];
diff --git a/ydb/core/tx/columnshard/export_actor.cpp b/ydb/core/tx/columnshard/export_actor.cpp
index 207c5b0b295..5b9cfca6bbb 100644
--- a/ydb/core/tx/columnshard/export_actor.cpp
+++ b/ydb/core/tx/columnshard/export_actor.cpp
@@ -52,7 +52,7 @@ public:
{
auto it = Event->Blobs.find(blobId);
Y_VERIFY(it != Event->Blobs.end());
- it->second.Data = blobData;
+ it->second = blobData;
}
if (BlobsToRead.empty()) {
diff --git a/ydb/core/tx/tiering/s3_actor.cpp b/ydb/core/tx/tiering/s3_actor.cpp
index abe1eb28a5f..b0893c72fa8 100644
--- a/ydb/core/tx/tiering/s3_actor.cpp
+++ b/ydb/core/tx/tiering/s3_actor.cpp
@@ -116,24 +116,20 @@ public:
void Handle(TEvPrivate::TEvExport::TPtr& ev) {
auto& msg = *ev->Get();
ui64 exportNo = msg.ExportNo;
- Y_VERIFY(ev->Get()->DstActor == ShardActor);
+ Y_VERIFY(msg.DstActor == ShardActor);
Y_VERIFY(!Exports.count(exportNo));
Exports[exportNo] = TS3Export(ev->Release());
auto& ex = Exports[exportNo];
- for (auto& [blobId, blob] : ex.Blobs()) {
- TString key = ex.AddExported(blobId, blob.PathId).GetS3Key();
+ for (auto& [blobId, blobData] : ex.Blobs()) {
+ TString key = ex.AddExported(blobId, msg.PathId).GetS3Key();
Y_VERIFY(!ExportingKeys.count(key)); // TODO
ex.RegisterKey(key);
ExportingKeys[key] = exportNo;
- if (blob.Evicting) {
- SendPutObjectIfNotExists(key, std::move(blob.Data));
- } else {
- SendPutObject(key, std::move(blob.Data));
- }
+ SendPutObjectIfNotExists(key, std::move(blobData));
}
}
diff --git a/ydb/core/tx/tiering/ut/ut_tiers.cpp b/ydb/core/tx/tiering/ut/ut_tiers.cpp
index f6e592e1e8c..b6e8e77242c 100644
--- a/ydb/core/tx/tiering/ut/ut_tiers.cpp
+++ b/ydb/core/tx/tiering/ut/ut_tiers.cpp
@@ -29,8 +29,8 @@ private:
using TBase = Tests::NCS::THelper;
public:
using TBase::TBase;
- void CreateTestOlapTable(TString tableName = "olapTable", TString storeName = "olapStore",
- ui32 storeShardsCount = 4, ui32 tableShardsCount = 3,
+ void CreateTestOlapTable(TString tableName = "olapTable", ui32 tableShardsCount = 3,
+ TString storeName = "olapStore", ui32 storeShardsCount = 4,
TString shardingFunction = "HASH_FUNCTION_CLOUD_LOGS") {
TActorId sender = Server.GetRuntime()->AllocateEdgeActor();
CreateTestOlapStore(sender, Sprintf(R"(
@@ -489,14 +489,22 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
emulator->SetExpectedTiersCount(2);
emulator->CheckRuntime(runtime);
}
- lHelper.CreateTestOlapTable("olapTable");
+ lHelper.CreateTestOlapTable("olapTable", 2);
Cerr << "Wait tables" << Endl;
runtime.SimulateSleep(TDuration::Seconds(20));
Cerr << "Initialization tables" << Endl;
- Cerr << "Insert..." << Endl;
const TInstant pkStart = Now() - TDuration::Days(15);
ui32 idx = 0;
- lHelper.SendDataViaActorSystem("/Root/olapStore/olapTable", 0, (pkStart + TDuration::Seconds(2 * idx++)).GetValue(), 2000);
+
+ auto batch = lHelper.TestArrowBatch(0, (pkStart + TDuration::Seconds(2 * idx++)).GetValue(), 6000);
+ auto batchSize = NArrow::GetBatchDataSize(batch);
+ Cerr << "Inserting " << batchSize << " bytes..." << Endl;
+ UNIT_ASSERT(batchSize > 4 * 1024 * 1024); // NColumnShard::TLimits::MIN_BYTES_TO_INSERT
+ UNIT_ASSERT(batchSize < 8 * 1024 * 1024);
+
+ for (ui32 i = 0; i < 4; ++i) {
+ lHelper.SendDataViaActorSystem("/Root/olapStore/olapTable", batch);
+ }
{
const TInstant start = Now();
bool check = false;
@@ -513,7 +521,7 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
#endif
runtime.SimulateSleep(TDuration::Seconds(1));
}
- Y_VERIFY(check);
+ UNIT_ASSERT(check);
}
#ifdef S3_TEST_USAGE
Cerr << "storage initialized..." << Endl;
@@ -536,10 +544,10 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
#endif
runtime.SimulateSleep(TDuration::Seconds(1));
}
- Y_VERIFY(check);
+ UNIT_ASSERT(check);
}
#ifndef S3_TEST_USAGE
- Y_VERIFY(Singleton<NKikimr::NWrappers::NExternalStorage::TFakeExternalStorage>()->GetBucketsCount() == 1);
+ UNIT_ASSERT_EQUAL(Singleton<NKikimr::NWrappers::NExternalStorage::TFakeExternalStorage>()->GetBucketsCount(), 1);
#endif
}