diff options
author | chertus <azuikov@ydb.tech> | 2023-06-16 12:04:30 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2023-06-16 12:04:30 +0300 |
commit | b19fb21198fe8919ac76d17c392bf4a2ed7db7f2 (patch) | |
tree | fac8771fa1299dd7f7732bb49cd8f958be3eaf02 | |
parent | 1ea35d0010fa056401642b5555cac149b3ef4690 (diff) | |
download | ydb-b19fb21198fe8919ac76d17c392bf4a2ed7db7f2.tar.gz |
fix exported blobs lifecycle (cache and forget after use)
-rw-r--r-- | ydb/core/tx/columnshard/blob_manager.cpp | 10 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_ut_common.h | 24 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp | 248 |
5 files changed, 216 insertions, 76 deletions
diff --git a/ydb/core/tx/columnshard/blob_manager.cpp b/ydb/core/tx/columnshard/blob_manager.cpp index f1e7819a06..580adaba6f 100644 --- a/ydb/core/tx/columnshard/blob_manager.cpp +++ b/ydb/core/tx/columnshard/blob_manager.cpp @@ -536,7 +536,9 @@ void TBlobManager::DeleteBlob(const TUnifiedBlobId& blobId, IBlobManagerDb& db) BlobsManagerCounters.OnDeleteBlobMarker(blobId.BlobSize()); BlobsManagerCounters.OnBlobsDelete(BlobsToDelete); } - NBlobCache::ForgetBlob(blobId); + if (!EvictedBlobs.contains(TEvictedBlob{.Blob = blobId})) { + NBlobCache::ForgetBlob(blobId); + } } else { BlobsManagerCounters.OnDeleteBlobDelayedMarker(blobId.BlobSize()); LOG_S_DEBUG("BlobManager at tablet " << TabletInfo->TabletID << " Delay Delete Blob " << blobId); @@ -725,6 +727,7 @@ void TBlobManager::SetBlobInUse(const TUnifiedBlobId& blobId, bool inUse) { return; } + LOG_S_DEBUG("BlobManager at tablet " << TabletInfo->TabletID << " Blob " << blobId << " is no longer in use"); BlobsUseCount.erase(useIt); // Check if the blob is marked for delayed deletion @@ -742,7 +745,10 @@ void TBlobManager::SetBlobInUse(const TUnifiedBlobId& blobId, bool inUse) { BlobsToDelete.insert(logoBlobId); BlobsManagerCounters.OnBlobsDelete(BlobsToDelete); BlobsManagerCounters.OnDeleteBlobMarker(logoBlobId.BlobSize()); - NBlobCache::ForgetBlob(blobId); + + if (!EvictedBlobs.contains(TEvictedBlob{.Blob = blobId})) { + NBlobCache::ForgetBlob(blobId); + } } } } diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index 62b978efd1..f03293da5f 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -121,6 +121,12 @@ void TColumnShard::Handle(TEvPrivate::TEvReadFinished::TPtr& ev, const TActorCon ScanTxInFlight.erase(txId); SetCounter(COUNTER_SCAN_IN_FLY, ScanTxInFlight.size()); } + + { // Cleanup just freed dropped exported blobs + THashSet<NOlap::TEvictedBlob> blobsToForget; + BlobManager->GetCleanupBlobs(blobsToForget); + ForgetBlobs(ctx, blobsToForget); + } } void TColumnShard::Handle(TEvPrivate::TEvPeriodicWakeup::TPtr& ev, const TActorContext& ctx) { diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index aa574c533b..e8e924a202 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -953,7 +953,7 @@ void TColumnShard::ForgetBlobs(const TActorContext& ctx, const THashSet<NOlap::T for (const auto& ev : evictedBlobs) { auto& blobId = ev.Blob; if (BlobManager->BlobInUse(blobId)) { - LOG_S_DEBUG("Blob '" << blobId.ToStringNew() << "' in use at tablet " << TabletID()); + LOG_S_DEBUG("Blob '" << blobId.ToStringNew() << "' is in use at tablet " << TabletID()); strBlobsDelayed << "'" << blobId.ToStringNew() << "' "; continue; } @@ -969,7 +969,7 @@ void TColumnShard::ForgetBlobs(const TActorContext& ctx, const THashSet<NOlap::T tierBlobs[meta.GetTierName()].emplace_back(std::move(evict)); } else { Y_VERIFY(evict.Blob == blobId); - strBlobsDelayed << "'"<< blobId.ToStringNew() << "' "; + strBlobsDelayed << "'" << blobId.ToStringNew() << "' "; } } diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h index bb061fda4d..6f994b27dc 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.h +++ b/ydb/core/tx/columnshard/columnshard_ut_common.h @@ -77,6 +77,30 @@ struct TTestSchema { TtlColumn = columnName; return *this; } + + static NKikimrSchemeOp::TS3Settings FakeS3() { + const TString bucket = "tiering-test-01"; + + NKikimrSchemeOp::TS3Settings s3Config; + s3Config.SetScheme(NKikimrSchemeOp::TS3Settings::HTTP); + s3Config.SetVerifySSL(false); + s3Config.SetBucket(bucket); +//#define S3_TEST_USAGE +#ifdef S3_TEST_USAGE + s3Config.SetEndpoint("storage.cloud-preprod.yandex.net"); + s3Config.SetAccessKey("..."); + s3Config.SetSecretKey("..."); + s3Config.SetProxyHost("localhost"); + s3Config.SetProxyPort(8080); + s3Config.SetProxyScheme(NKikimrSchemeOp::TS3Settings::HTTP); +#else + s3Config.SetEndpoint("fake"); +#endif + s3Config.SetRequestTimeoutMs(10000); + s3Config.SetHttpRequestTimeoutMs(10000); + s3Config.SetConnectionTimeoutMs(10000); + return s3Config; + } }; struct TTableSpecials : public TStorageTier { diff --git a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp index 74c452a487..00fc60be19 100644 --- a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp @@ -368,46 +368,64 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, class TCountersContainer { private: - ui32 SuccessCounterStart = 0; + struct TCounters { + ui32 Attempt = 0; + ui32 Request = 0; + ui32 Response = 0; + ui32 Success = 0; + + void Clear() { + Attempt = 0; + Request = 0; + Response = 0; + Success = 0; + } + + TString ToString() const { + return TStringBuilder() << Attempt << "/" << Request << "/" << Response << "/" << Success; + } + }; + + ui32 WaitNo = 0; + public: - ui32 UnknownsCounter = 0; - ui32 SuccessCounter = 0; - ui32 ErrorsCounter = 0; - ui32 ResponsesCounter = 0; + TCounters ExportCounters; + TCounters ForgetCounters; ui32 CaptureReadEvents = 0; std::vector<TAutoPtr<IEventHandle>> CapturedReads; - TString SerializeToString() const { - TStringBuilder sb; - sb << "EXPORTS INFO: " << SuccessCounter << "/" << ErrorsCounter << "/" << UnknownsCounter << "/" << ResponsesCounter; - return sb; - } - - void WaitEvents(TTestBasicRuntime& runtime, const ui32 attemption, const ui32 expectedDeltaSuccess, const TDuration timeout) { + void WaitEvents(TTestBasicRuntime& runtime, const TDuration& timeout, ui32 waitExports, ui32 waitForgets, + const TString& promo = "START_WAITING") { const TInstant startInstant = TAppData::TimeProvider->Now(); const TInstant deadline = startInstant + timeout; - Cerr << "START_WAITING(" << attemption << "): " << SerializeToString() << Endl; + Cerr << promo << "(" << WaitNo << "): " + << "E" << ExportCounters.ToString() << " F" << ForgetCounters.ToString() << Endl; while (TAppData::TimeProvider->Now() < deadline) { - Cerr << "IN_WAITING(" << attemption << "):" << SerializeToString() << Endl; + Cerr << "IN_WAITING(" << WaitNo << "): " + << "E" << ExportCounters.ToString() << " F" << ForgetCounters.ToString() << Endl; runtime.SimulateSleep(TDuration::Seconds(1)); - UNIT_ASSERT(ErrorsCounter == 0); - if (expectedDeltaSuccess) { - if (SuccessCounter >= SuccessCounterStart + expectedDeltaSuccess) { - break; - } - } else { - if (SuccessCounter > SuccessCounterStart) { - break; - } + + if (!waitExports && ExportCounters.Success + || !waitForgets && ForgetCounters.Success + || !waitForgets && ExportCounters.Success >= waitExports + || !waitExports && ForgetCounters.Success >= waitForgets + || waitExports && waitForgets + && ExportCounters.Success >= waitExports && ForgetCounters.Success >= waitForgets) { + break; } } - if (expectedDeltaSuccess) { - UNIT_ASSERT(SuccessCounter >= SuccessCounterStart + expectedDeltaSuccess); - } else { - UNIT_ASSERT_VALUES_EQUAL(SuccessCounter, SuccessCounterStart); - } - Cerr << "FINISH_WAITING(" << attemption << "): " << SerializeToString() << Endl; - SuccessCounterStart = SuccessCounter; + Cerr << "FINISH_WAITING(" << WaitNo << "): " + << "E" << ExportCounters.ToString() << " F" << ForgetCounters.ToString() << Endl; + UNIT_ASSERT_VALUES_EQUAL(ExportCounters.Success, waitExports); + UNIT_ASSERT_VALUES_EQUAL(ForgetCounters.Success, waitForgets); + ExportCounters.Clear(); + ForgetCounters.Clear(); + ++WaitNo; + } + + void WaitMoreEvents(TTestBasicRuntime& runtime, const TDuration& timeout, ui32 waitExports, ui32 waitForgets) { + --WaitNo; + WaitEvents(runtime, timeout, waitExports, waitForgets, "CONTINUE_WAITING"); } void WaitReadsCaptured(TTestBasicRuntime& runtime) const { @@ -447,12 +465,27 @@ public: bool operator()(TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { TStringBuilder ss; if (auto* msg = TryGetPrivateEvent<NColumnShard::TEvPrivate::TEvExport>(ev)) { - ss << "EXPORT(" << ++Counters->SuccessCounter << "): " << NKikimrProto::EReplyStatus_Name(msg->Status); + if (msg->Status == NKikimrProto::OK) { + ss << "EXPORT(done " << ++Counters->ExportCounters.Success << "): "; + } else { + ss << "EXPORT(attempt " << ++Counters->ExportCounters.Attempt << "): " + << NKikimrProto::EReplyStatus_Name(msg->Status); + } + } else if (auto* msg = TryGetPrivateEvent<NColumnShard::TEvPrivate::TEvForget>(ev)) { + if (msg->Status == NKikimrProto::OK) { + ss << "FORGET(done " << ++Counters->ForgetCounters.Success << "): "; + } else { + ss << "FORGET(attempt " << ++Counters->ForgetCounters.Attempt << "): " + << NKikimrProto::EReplyStatus_Name(msg->Status); + } + } else if (auto* msg = TryGetPrivateEvent<NWrappers::NExternalStorage::TEvPutObjectRequest>(ev)) { + ss << "S3_REQ(put " << ++Counters->ExportCounters.Request << "):"; } else if (auto* msg = TryGetPrivateEvent<NWrappers::NExternalStorage::TEvPutObjectResponse>(ev)) { - ss << "S3_RESPONSE(put " << ++Counters->ResponsesCounter << "):"; + ss << "S3_RESPONSE(put " << ++Counters->ExportCounters.Response << "):"; + } else if (auto* msg = TryGetPrivateEvent<NWrappers::NExternalStorage::TEvDeleteObjectRequest>(ev)) { + ss << "S3_REQ(delete " << ++Counters->ForgetCounters.Request << "):"; } else if (auto* msg = TryGetPrivateEvent<NWrappers::NExternalStorage::TEvDeleteObjectResponse>(ev)) { - ss << "(" << ++Counters->SuccessCounter << "): DELETE SUCCESS"; - ss << "S3_RESPONSE(delete " << ++Counters->ResponsesCounter << "):"; + ss << "S3_RESPONSE(delete " << ++Counters->ForgetCounters.Response << "):"; } else if (auto* msg = TryGetPrivateEvent<NBlobCache::TEvBlobCache::TEvReadBlobRange>(ev)) { if (Counters->CaptureReadEvents) { Cerr << "CAPTURE " << msg->BlobRange.ToString() << " " @@ -469,17 +502,18 @@ public: ss << " " << ev->Sender << "->" << ev->Recipient; Cerr << ss << Endl; return false; - }; + } }; std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTestSchema::TTableSpecials>& specs, - const ui32 initialEviction) + const THashSet<ui32>& exportSteps, const THashSet<ui32>& forgetSteps) { TTestBasicRuntime runtime; TTester::Setup(runtime); - runtime.SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_INFO); + runtime.SetLogPriority(NKikimrServices::TX_COLUMNSHARD_SCAN, NActors::NLog::PRI_INFO); TActorId sender = runtime.AllocateEdgeActor(); CreateTestBootstrapper(runtime, @@ -509,6 +543,7 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt ui64 tableId = 1; ui64 planStep = 1000000000; // greater then delays ui64 txId = 100; + const TDuration exportTimeout = TDuration::Seconds(40); UNIT_ASSERT(specs.size() > 0); { @@ -532,6 +567,8 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt RebootTablet(runtime, TTestTxConfig::TxTablet0, sender); } + runtime.SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); + TAutoPtr<IEventHandle> handle; std::vector<std::pair<ui32, ui64>> specRowsBytes; @@ -540,9 +577,11 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt TCountersContainer counter; runtime.SetEventFilter(TEventsCounter(counter, runtime)); for (ui32 i = 0; i < specs.size(); ++i) { + ui32 numExports = exportSteps.contains(i) ? 1 : 0; + ui32 numForgets = forgetSteps.contains(i) ? 1 : 0; bool hasColdEviction = false; - for (auto&& i : specs[i].Tiers) { - if (!!i.S3) { + for (auto&& spec : specs[i].Tiers) { + if (!!spec.S3) { hasColdEviction = true; break; } @@ -576,17 +615,17 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt TriggerTTL(runtime, sender, NOlap::TSnapshot(++planStep, ++txId), {}, 0, specs[i].TtlColumn); Cerr << (hasColdEviction ? "Cold" : "Hot") - << " tiering, spec " << i << ", num tiers: " << specs[i].Tiers.size() << "\n"; + << " tiering, spec " << i << ", num tiers: " << specs[i].Tiers.size() + << ", exports: " << numExports << ", forgets: " << numForgets << Endl; - if (hasColdEviction) { - if (i > initialEviction) { - counter.WaitEvents(runtime, i, 1, TDuration::Seconds(40)); - } else { - counter.WaitEvents(runtime, i, 0, TDuration::Seconds(20)); - } + if (numExports) { + UNIT_ASSERT(hasColdEviction); + counter.WaitEvents(runtime, exportTimeout, numExports, 0); } else { - counter.WaitEvents(runtime, i, 0, TDuration::Seconds(4)); + TDuration timeout = hasColdEviction ? TDuration::Seconds(10) : TDuration::Seconds(4); + counter.WaitEvents(runtime, timeout, 0, 0); } + if (reboots) { ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(specs[i])); } @@ -649,6 +688,12 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt if (reboots) { RebootTablet(runtime, TTestTxConfig::TxTablet0, sender); } + + // TODO: move upper + if (numForgets) { + UNIT_ASSERT(hasColdEviction); + counter.WaitMoreEvents(runtime, exportTimeout, 0, numForgets); + } } return specRowsBytes; @@ -752,13 +797,32 @@ std::vector<std::pair<ui32, ui64>> TestTiersAndTtl(const TTestSchema::TTableSpec size_t initialEviction = alters.size(); TEvictionChanges changes; + THashSet<ui32> exports; + THashSet<ui32> forgets; if (testTtl) { changes.AddTtlAlters(spec, {allowBoth, allowOne, allowNone}, alters); } else { changes.AddTierAlters(spec, {allowBoth, allowOne, allowNone}, alters); + + for (ui32 i = initialEviction + 1; i < alters.size() - 1; ++i) { + for (auto& tier : alters[i].Tiers) { + if (tier.S3) { + exports.emplace(i); + break; + } + } + } + for (ui32 i = initialEviction + 2; i < alters.size(); ++i) { + for (auto& tier : alters[i].Tiers) { + if (tier.S3) { + forgets.emplace(i); + break; + } + } + } } - auto rowsBytes = TestTiers(reboots, blobs, alters, initialEviction); + auto rowsBytes = TestTiers(reboots, blobs, alters, exports, forgets); for (auto&& i : rowsBytes) { Cerr << i.first << "/" << i.second << Endl; } @@ -771,6 +835,38 @@ std::vector<std::pair<ui32, ui64>> TestTiersAndTtl(const TTestSchema::TTableSpec return rowsBytes; } +std::vector<std::pair<ui32, ui64>> TestOneTierExport(const TTestSchema::TTableSpecials& spec, bool reboots) { + const std::vector<ui64> ts = { 1600000000, 1620000000 }; + + ui32 overlapSize = 0; + std::vector<TString> blobs = MakeData(ts, PORTION_ROWS, overlapSize, spec.TtlColumn); + + TInstant now = TAppData::TimeProvider->Now(); + TDuration allowBoth = TDuration::Seconds(now.Seconds() - ts[0] + 600); + TDuration allowOne = TDuration::Seconds(now.Seconds() - ts[1] + 600); + TDuration allowNone = TDuration::Seconds(now.Seconds() - ts[1] - 600); + + std::vector<TTestSchema::TTableSpecials> alters = { TTestSchema::TTableSpecials() }; + + TEvictionChanges changes; + changes.AddTierAlters(spec, {allowBoth, allowOne, allowNone}, alters); + UNIT_ASSERT_VALUES_EQUAL(alters.size(), 4); + + // TODO: Add error in config => eviction + not finished export + //UNIT_ASSERT_VALUES_EQUAL(alters[1].Tiers.size(), 1); + //UNIT_ASSERT(alters[1].Tiers[0].S3); + //alters[1].Tiers[0].S3->SetEndpoint("wrong"); + + auto rowsBytes = TestTiers(reboots, blobs, alters, {1}, {2, 3}); + for (auto&& i : rowsBytes) { + Cerr << i.first << "/" << i.second << Endl; + } + + UNIT_ASSERT_EQUAL(rowsBytes.size(), alters.size()); + // TODO + return rowsBytes; +} + void TestTwoHotTiers(bool reboot, bool changeTtl, const EInitialEviction initial = EInitialEviction::None) { TTestSchema::TTableSpecials spec; spec.SetTtlColumn("timestamp"); @@ -802,7 +898,6 @@ void TestTwoHotTiers(bool reboot, bool changeTtl, const EInitialEviction initial } void TestHotAndColdTiers(bool reboot, const EInitialEviction initial) { - const TString bucket = "tiering-test-01"; TPortManager portManager; const ui16 port = portManager.GetPort(); @@ -813,32 +908,26 @@ void TestHotAndColdTiers(bool reboot, const EInitialEviction initial) { spec.SetTtlColumn("timestamp"); spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier0").SetTtlColumn("timestamp")); spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier1").SetTtlColumn("timestamp")); - spec.Tiers.back().S3 = NKikimrSchemeOp::TS3Settings(); - auto& s3Config = *spec.Tiers.back().S3; - { - - s3Config.SetScheme(NKikimrSchemeOp::TS3Settings::HTTP); - s3Config.SetVerifySSL(false); - s3Config.SetBucket(bucket); -//#define S3_TEST_USAGE -#ifdef S3_TEST_USAGE - s3Config.SetEndpoint("storage.cloud-preprod.yandex.net"); - s3Config.SetAccessKey("..."); - s3Config.SetSecretKey("..."); - s3Config.SetProxyHost("localhost"); - s3Config.SetProxyPort(8080); - s3Config.SetProxyScheme(NKikimrSchemeOp::TS3Settings::HTTP); -#else - s3Config.SetEndpoint("fake"); -#endif - s3Config.SetRequestTimeoutMs(10000); - s3Config.SetHttpRequestTimeoutMs(10000); - s3Config.SetConnectionTimeoutMs(10000); - } + spec.Tiers.back().S3 = TTestSchema::TStorageTier::FakeS3(); TestTiersAndTtl(spec, reboot, initial); } +void TestExport(bool reboot) { + TPortManager portManager; + const ui16 port = portManager.GetPort(); + + TS3Mock s3Mock({}, TS3Mock::TSettings(port)); + UNIT_ASSERT(s3Mock.Start()); + + TTestSchema::TTableSpecials spec; + spec.SetTtlColumn("timestamp"); + spec.Tiers.emplace_back(TTestSchema::TStorageTier("cold").SetTtlColumn("timestamp")); + spec.Tiers.back().S3 = TTestSchema::TStorageTier::FakeS3(); + + TestOneTierExport(spec, reboot); +} + void TestDrop(bool reboots) { TTestBasicRuntime runtime; TTester::Setup(runtime); @@ -1168,7 +1257,22 @@ Y_UNIT_TEST_SUITE(TColumnShardTestSchema) { TestHotAndColdTiers(true, EInitialEviction::Ttl); } - // TODO: EnableTtlAfterColdTiers + Y_UNIT_TEST(OneColdTier) { + TestExport(false); + } + + Y_UNIT_TEST(RebootOneColdTier) { + TestExport(true); + } + + // TODO: ExportAfterFail + // TODO: ForgetAfterFail + + // TODO: LastTierBorderIsTtl = false + + // TODO: DisableTierAfterExport + // TODO: ReenableTierAfterExport + // TODO: AlterTierBorderAfterExport Y_UNIT_TEST(Drop) { TestDrop(false); |