aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2023-06-30 18:23:22 +0300
committerchertus <azuikov@ydb.tech>2023-06-30 18:23:22 +0300
commit83e9195e30a0c2891f89d3cddf69b1c863cf3c52 (patch)
tree1739d19b1e9cc39781721385659942282fb426b1
parentb454398de8bd6fda1251382513891ede660e6755 (diff)
downloadydb-83e9195e30a0c2891f89d3cddf69b1c863cf3c52.tar.gz
fix TCompactedBlobsConstructor
-rw-r--r--ydb/core/tx/columnshard/engines/writer/blob_constructor.h8
-rw-r--r--ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp65
-rw-r--r--ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h18
-rw-r--r--ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp18
-rw-r--r--ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h12
-rw-r--r--ydb/core/tx/columnshard/write_actor.cpp24
6 files changed, 82 insertions, 63 deletions
diff --git a/ydb/core/tx/columnshard/engines/writer/blob_constructor.h b/ydb/core/tx/columnshard/engines/writer/blob_constructor.h
index 7d4f720b484..d35d27d7109 100644
--- a/ydb/core/tx/columnshard/engines/writer/blob_constructor.h
+++ b/ydb/core/tx/columnshard/engines/writer/blob_constructor.h
@@ -32,9 +32,13 @@ public:
virtual ~IBlobConstructor() {}
virtual const TString& GetBlob() const = 0;
virtual bool RegisterBlobId(const TUnifiedBlobId& blobId) = 0;
- virtual EStatus BuildNext(NColumnShard::TUsage& resourceUsage, const TAppData& appData) = 0;
+ virtual EStatus BuildNext() = 0;
+ virtual NColumnShard::TUsage& GetResourceUsage() = 0;
- virtual TAutoPtr<NActors::IEventBase> BuildResult(NKikimrProto::EReplyStatus status, NColumnShard::TBlobBatch&& blobBatch, THashSet<ui32>&& yellowMoveChannels, THashSet<ui32>&& yellowStopChannels, const NColumnShard::TUsage& resourceUsage) = 0;
+ virtual TAutoPtr<NActors::IEventBase> BuildResult(
+ NKikimrProto::EReplyStatus status,
+ NColumnShard::TBlobBatch&& blobBatch,
+ THashSet<ui32>&& yellowMoveChannels, THashSet<ui32>&& yellowStopChannels) = 0;
};
}
diff --git a/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp
index 2c8c412fd56..06ab340ddc6 100644
--- a/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp
+++ b/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp
@@ -12,9 +12,15 @@ TCompactedBlobsConstructor::TCompactedBlobsConstructor(TAutoPtr<NColumnShard::TE
, Blobs(WriteIndexEv->Blobs)
, BlobGrouppingEnabled(blobGrouppingEnabled)
, CacheData(WriteIndexEv->CacheData)
+ , IsEviction(IndexChanges.PortionsToEvict.size() > 0)
{
- auto indexChanges = WriteIndexEv->IndexChanges;
- LastPortion = indexChanges->AppendedPortions.size() + indexChanges->PortionsToEvict.size();
+ if (IsEviction) {
+ Y_VERIFY(IndexChanges.AppendedPortions.empty());
+ LastPortion = IndexChanges.PortionsToEvict.size();
+ } else {
+ Y_VERIFY(IndexChanges.PortionsToEvict.empty());
+ LastPortion = IndexChanges.AppendedPortions.size();
+ }
Y_VERIFY(Blobs.size() > 0);
}
@@ -26,7 +32,7 @@ bool TCompactedBlobsConstructor::RegisterBlobId(const TUnifiedBlobId& blobId) {
Y_VERIFY(AccumulatedBlob.size() > 0);
Y_VERIFY(RecordsInBlob.size() > 0);
- auto& portionInfo = PortionUpdates.back();
+ auto& portionInfo = PortionUpdates.back();
LOG_S_TRACE("Write Index Blob " << blobId << " with " << RecordsInBlob.size() << " records");
for (const auto& rec : RecordsInBlob) {
size_t i = rec.first;
@@ -45,37 +51,32 @@ bool TCompactedBlobsConstructor::RegisterBlobId(const TUnifiedBlobId& blobId) {
return true;
}
-IBlobConstructor::EStatus TCompactedBlobsConstructor::BuildNext(NColumnShard::TUsage& resourceUsage, const TAppData& /*appData*/) {
- Y_UNUSED(resourceUsage);
- if (CurrentPortion == LastPortion) {
- Y_VERIFY(CurrentBlob == Blobs.size());
- return EStatus::Finished;
- }
-
+IBlobConstructor::EStatus TCompactedBlobsConstructor::BuildNext() {
AccumulatedBlob.clear();
RecordsInBlob.clear();
- if (CurrentPortionRecord == 0) {
- PortionUpdates.push_back(GetPortionInfo(CurrentPortion));
- // There could be eviction mix between normal eviction and eviction without data changes
- // TODO: better portions to blobs mathching
- const bool eviction = IndexChanges.PortionsToEvict.size() > 0;
- if (eviction) {
- while (CurrentPortion < LastPortion && !IndexChanges.PortionsToEvict[CurrentPortion].second.DataChanges) {
- ++CurrentPortion;
- PortionUpdates.push_back(GetPortionInfo(CurrentPortion));
- continue;
- }
- if (CurrentPortion == LastPortion) {
- return EStatus::Finished;
+ if (IsEviction && CurrentPortionRecord == 0) {
+ // Skip portions without data changes
+ for (; CurrentPortion < LastPortion; ++CurrentPortion) {
+ if (IndexChanges.PortionsToEvict[CurrentPortion].second.DataChanges) {
+ break;
}
+ PortionUpdates.push_back(GetPortionInfo(CurrentPortion));
}
}
- auto& portionInfo = GetPortionInfo(CurrentPortion);
+ if (CurrentPortion == LastPortion) {
+ Y_VERIFY(CurrentBlob == Blobs.size());
+ return EStatus::Finished;
+ }
+
+ const auto& portionInfo = GetPortionInfo(CurrentPortion);
+ if (CurrentPortionRecord == 0) {
+ PortionUpdates.push_back(portionInfo);
+ }
NOlap::TPortionInfo& newPortionInfo = PortionUpdates.back();
- const auto& records = portionInfo.Records;
+ const auto& records = portionInfo.Records;
for (; CurrentPortionRecord < records.size(); ++CurrentPortionRecord, ++CurrentBlob) {
Y_VERIFY(CurrentBlob < Blobs.size());
const TString& currentBlob = Blobs[CurrentBlob];
@@ -99,11 +100,14 @@ IBlobConstructor::EStatus TCompactedBlobsConstructor::BuildNext(NColumnShard::TU
return AccumulatedBlob.empty() ? EStatus::Finished : EStatus::Ok;
}
-TAutoPtr<IEventBase> TCompactedBlobsConstructor::BuildResult(NKikimrProto::EReplyStatus status, NColumnShard::TBlobBatch&& blobBatch, THashSet<ui32>&& yellowMoveChannels, THashSet<ui32>&& yellowStopChannels, const NColumnShard::TUsage& resourceUsage) {
+TAutoPtr<IEventBase> TCompactedBlobsConstructor::BuildResult(NKikimrProto::EReplyStatus status,
+ NColumnShard::TBlobBatch&& blobBatch,
+ THashSet<ui32>&& yellowMoveChannels,
+ THashSet<ui32>&& yellowStopChannels)
+{
for (ui64 index = 0; index < PortionUpdates.size(); ++index) {
const auto& portionInfo = PortionUpdates[index];
- const bool eviction = IndexChanges.PortionsToEvict.size() > 0;
- if (eviction) {
+ if (IsEviction) {
Y_VERIFY(index < IndexChanges.PortionsToEvict.size());
WriteIndexEv->IndexChanges->PortionsToEvict[index].first = portionInfo;
} else {
@@ -112,15 +116,14 @@ TAutoPtr<IEventBase> TCompactedBlobsConstructor::BuildResult(NKikimrProto::ERepl
}
}
- WriteIndexEv->ResourceUsage.Add(resourceUsage);
+ WriteIndexEv->ResourceUsage.Add(ResourceUsage);
WriteIndexEv->SetPutStatus(status, std::move(yellowMoveChannels), std::move(yellowStopChannels));
WriteIndexEv->BlobBatch = std::move(blobBatch);
return WriteIndexEv.Release();
}
const NOlap::TPortionInfo& TCompactedBlobsConstructor::GetPortionInfo(const ui64 index) const {
- bool eviction = IndexChanges.PortionsToEvict.size() > 0;
- if (eviction) {
+ if (IsEviction) {
Y_VERIFY(index < IndexChanges.PortionsToEvict.size());
return IndexChanges.PortionsToEvict[index].first;
} else {
diff --git a/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h b/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h
index aebb3d87a92..4b84584886a 100644
--- a/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h
+++ b/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h
@@ -9,30 +9,36 @@ namespace NKikimr::NOlap {
class TCompactedBlobsConstructor : public IBlobConstructor {
TAutoPtr<NColumnShard::TEvPrivate::TEvWriteIndex> WriteIndexEv;
+ NColumnShard::TUsage ResourceUsage;
const NOlap::TColumnEngineChanges& IndexChanges;
const std::vector<TString>& Blobs;
-
const bool BlobGrouppingEnabled;
const bool CacheData;
+ const bool IsEviction;
TString AccumulatedBlob;
std::vector<std::pair<size_t, TString>> RecordsInBlob;
+ std::vector<NOlap::TPortionInfo> PortionUpdates;
ui64 CurrentPortion = 0;
ui64 LastPortion = 0;
-
ui64 CurrentBlob = 0;
ui64 CurrentPortionRecord = 0;
- TVector<NOlap::TPortionInfo> PortionUpdates;
-
public:
TCompactedBlobsConstructor(TAutoPtr<NColumnShard::TEvPrivate::TEvWriteIndex> writeIndexEv, bool blobGrouppingEnabled);
const TString& GetBlob() const override;
bool RegisterBlobId(const TUnifiedBlobId& blobId) override;
- EStatus BuildNext(NColumnShard::TUsage& resourceUsage, const TAppData& /*appData*/) override;
+ EStatus BuildNext() override;
+
+ NColumnShard::TUsage& GetResourceUsage() override {
+ return ResourceUsage;
+ }
- TAutoPtr<IEventBase> BuildResult(NKikimrProto::EReplyStatus status, NColumnShard::TBlobBatch&& blobBatch, THashSet<ui32>&& yellowMoveChannels, THashSet<ui32>&& yellowStopChannels, const NColumnShard::TUsage& resourceUsage) override;
+ TAutoPtr<IEventBase> BuildResult(
+ NKikimrProto::EReplyStatus status,
+ NColumnShard::TBlobBatch&& blobBatch,
+ THashSet<ui32>&& yellowMoveChannels, THashSet<ui32>&& yellowStopChannels) override;
private:
const NOlap::TPortionInfo& GetPortionInfo(const ui64 index) const;
diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
index cbaf9b21f5a..3a663d4215d 100644
--- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
+++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
@@ -15,7 +15,7 @@ const TString& TIndexedBlobConstructor::GetBlob() const {
return DataPrepared;
}
-IBlobConstructor::EStatus TIndexedBlobConstructor::BuildNext(NColumnShard::TUsage& resourceUsage, const TAppData& appData) {
+IBlobConstructor::EStatus TIndexedBlobConstructor::BuildNext() {
if (!!DataPrepared) {
return EStatus::Finished;
}
@@ -37,7 +37,7 @@ IBlobConstructor::EStatus TIndexedBlobConstructor::BuildNext(NColumnShard::TUsag
// Heavy operations inside. We cannot run them in tablet event handler.
TString strError;
{
- NColumnShard::TCpuGuard guard(resourceUsage);
+ NColumnShard::TCpuGuard guard(ResourceUsage);
Batch = SnapshotSchema->PrepareForInsert(data, serializedScheme, strError);
}
if (!Batch) {
@@ -46,7 +46,7 @@ IBlobConstructor::EStatus TIndexedBlobConstructor::BuildNext(NColumnShard::TUsag
}
{
- NColumnShard::TCpuGuard guard(resourceUsage);
+ NColumnShard::TCpuGuard guard(ResourceUsage);
DataPrepared = NArrow::SerializeBatchNoCompression(Batch);
}
@@ -54,8 +54,8 @@ IBlobConstructor::EStatus TIndexedBlobConstructor::BuildNext(NColumnShard::TUsag
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "ev_write_data_too_big")("write_id", writeId)("path_id", pathId);
return EStatus::Error;
}
-
- ui64 dirtyTime = appData.TimeProvider->Now().Seconds();
+
+ ui64 dirtyTime = TAppData::TimeProvider->Now().Seconds();
Y_VERIFY(dirtyTime);
NKikimrTxColumnShard::TLogicalMetadata outMeta;
outMeta.SetNumRows(Batch->num_rows());
@@ -78,14 +78,18 @@ bool TIndexedBlobConstructor::RegisterBlobId(const TUnifiedBlobId& blobId) {
return true;
}
-TAutoPtr<IEventBase> TIndexedBlobConstructor::BuildResult(NKikimrProto::EReplyStatus status, NColumnShard::TBlobBatch&& blobBatch, THashSet<ui32>&& yellowMoveChannels, THashSet<ui32>&& yellowStopChannels, const NColumnShard::TUsage& resourceUsage) {
+TAutoPtr<IEventBase> TIndexedBlobConstructor::BuildResult(NKikimrProto::EReplyStatus status,
+ NColumnShard::TBlobBatch&& blobBatch,
+ THashSet<ui32>&& yellowMoveChannels,
+ THashSet<ui32>&& yellowStopChannels)
+{
WriteEv->WrittenBatch = Batch;
auto& record = Proto(WriteEv.Get());
record.SetData(DataPrepared); // modify for TxWrite
record.MutableMeta()->SetLogicalMeta(MetaString);
- WriteEv->ResourceUsage.Add(resourceUsage);
+ WriteEv->ResourceUsage.Add(ResourceUsage);
WriteEv->SetPutStatus(status, std::move(yellowMoveChannels), std::move(yellowStopChannels));
WriteEv->BlobBatch = std::move(blobBatch);
return WriteEv.Release();
diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h
index 7ff9e385ab9..bba47550ada 100644
--- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h
+++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h
@@ -10,6 +10,7 @@ namespace NKikimr::NOlap {
class TIndexedBlobConstructor : public IBlobConstructor {
TAutoPtr<TEvColumnShard::TEvWrite> WriteEv;
NOlap::ISnapshotSchema::TPtr SnapshotSchema;
+ NColumnShard::TUsage ResourceUsage;
TString DataPrepared;
TString MetaString;
@@ -20,10 +21,17 @@ public:
TIndexedBlobConstructor(TAutoPtr<TEvColumnShard::TEvWrite> writeEv, NOlap::ISnapshotSchema::TPtr snapshotSchema);
const TString& GetBlob() const override;
- EStatus BuildNext(NColumnShard::TUsage& resourceUsage, const TAppData& appData) override;
+ EStatus BuildNext() override;
bool RegisterBlobId(const TUnifiedBlobId& blobId) override;
- TAutoPtr<NActors::IEventBase> BuildResult(NKikimrProto::EReplyStatus status, NColumnShard::TBlobBatch&& blobBatch, THashSet<ui32>&& yellowMoveChannels, THashSet<ui32>&& yellowStopChannels, const NColumnShard::TUsage& resourceUsage) override;
+ NColumnShard::TUsage& GetResourceUsage() override {
+ return ResourceUsage;
+ }
+
+ TAutoPtr<NActors::IEventBase> BuildResult(
+ NKikimrProto::EReplyStatus status,
+ NColumnShard::TBlobBatch&& blobBatch,
+ THashSet<ui32>&& yellowMoveChannels, THashSet<ui32>&& yellowStopChannels) override;
};
}
diff --git a/ydb/core/tx/columnshard/write_actor.cpp b/ydb/core/tx/columnshard/write_actor.cpp
index aaba96fe98b..7b85892a59b 100644
--- a/ydb/core/tx/columnshard/write_actor.cpp
+++ b/ydb/core/tx/columnshard/write_actor.cpp
@@ -14,11 +14,10 @@ namespace {
class TWriteActor : public TActorBootstrapped<TWriteActor> {
ui64 TabletId;
TActorId DstActor;
- TUsage ResourceUsage;
TBlobBatch BlobBatch;
NOlap::IBlobConstructor::TPtr BlobsConstructor;
-
+
THashSet<ui32> YellowMoveChannels;
THashSet<ui32> YellowStopChannels;
TInstant Deadline;
@@ -37,7 +36,7 @@ public:
, Deadline(deadline)
, MaxSmallBlobSize(maxSmallBlobSize)
{}
-
+
void Handle(TEvBlobStorage::TEvPutResult::TPtr& ev, const TActorContext& ctx) {
TEvBlobStorage::TEvPutResult* msg = ev->Get();
auto status = msg->Status;
@@ -76,10 +75,9 @@ public:
putStatus = NKikimrProto::TIMEOUT;
}
}
- auto ev = BlobsConstructor->BuildResult(putStatus, std::move(BlobBatch),
+ auto ev = BlobsConstructor->BuildResult(putStatus, std::move(BlobBatch),
std::move(YellowMoveChannels),
- std::move(YellowStopChannels),
- ResourceUsage);
+ std::move(YellowStopChannels));
ctx.Send(DstActor, ev.Release());
Die(ctx);
}
@@ -94,20 +92,16 @@ public:
const TDuration timeout = Deadline - now;
ctx.Schedule(timeout, new TEvents::TEvWakeup());
}
-
+
auto status = NOlap::IBlobConstructor::EStatus::Finished;
- while (true) {
- status = BlobsConstructor->BuildNext(ResourceUsage, *AppData(ctx));
- if (status != NOlap::IBlobConstructor::EStatus::Ok) {
- break;
- }
+ while (BlobsConstructor->BuildNext() == NOlap::IBlobConstructor::EStatus::Ok) {
auto blobId = SendWriteBlobRequest(BlobsConstructor->GetBlob(), ctx);
BlobsConstructor->RegisterBlobId(blobId);
-
}
if (status != NOlap::IBlobConstructor::EStatus::Finished) {
return SendResultAndDie(ctx, NKikimrProto::ERROR);
}
+
if (BlobBatch.AllBlobWritesCompleted()) {
return SendResultAndDie(ctx, NKikimrProto::OK);
}
@@ -122,10 +116,10 @@ public:
break;
}
}
-
+
private:
TUnifiedBlobId SendWriteBlobRequest(const TString& data, const TActorContext& ctx) {
- ResourceUsage.Network += data.size();
+ BlobsConstructor->GetResourceUsage().Network += data.size();
if (MaxSmallBlobSize && data.size() <= *MaxSmallBlobSize) {
TUnifiedBlobId smallBlobId = BlobBatch.AddSmallBlob(data);
Y_VERIFY(smallBlobId.IsSmallBlob());