diff options
author | ivanmorozov <[email protected]> | 2023-09-29 12:40:25 +0300 |
---|---|---|
committer | ivanmorozov <[email protected]> | 2023-09-29 13:15:53 +0300 |
commit | d1b6c34393585eae85b13c71f4ca6125ad908ddd (patch) | |
tree | 837bd729c1ad81914707922df586700afaab9c46 | |
parent | 9f05ad4a7c0f811fd2f7b6de45c1e1e7c3b29903 (diff) |
KIKIMR-19211: dont store original blob after unpack
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__write.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/operations/slice_builder.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/operations/write_data.cpp | 16 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/operations/write_data.h | 24 | ||||
-rw-r--r-- | ydb/core/tx/ev_write/write_data.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/ev_write/write_data.h | 13 |
6 files changed, 42 insertions, 23 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 017df46d15e..802f6fdbd15 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -215,7 +215,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor return; } - auto wg = WritesMonitor.RegisterWrite(arrowData->GetData().size()); + auto wg = WritesMonitor.RegisterWrite(arrowData->GetSize()); auto operation = OperationsManager.RegisterOperation(txId); Y_VERIFY(operation); operation->Start(*this, tableId, arrowData, source, ctx); diff --git a/ydb/core/tx/columnshard/operations/slice_builder.cpp b/ydb/core/tx/columnshard/operations/slice_builder.cpp index 6988c621f1a..14792dacbbf 100644 --- a/ydb/core/tx/columnshard/operations/slice_builder.cpp +++ b/ydb/core/tx/columnshard/operations/slice_builder.cpp @@ -12,7 +12,7 @@ std::optional<std::vector<NKikimr::NArrow::TSerializedBatch>> TBuildSlicesTask:: const ui64 tableId = writeMeta.GetTableId(); const ui64 writeId = writeMeta.GetWriteId(); - std::shared_ptr<arrow::RecordBatch> batch = WriteData.GetData().GetArrowBatch(); + std::shared_ptr<arrow::RecordBatch> batch = WriteData.GetDataPtr()->ExtractBatch(); if (!batch) { AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "ev_write_bad_data")("write_id", writeId)("table_id", tableId); @@ -34,7 +34,9 @@ std::optional<std::vector<NKikimr::NArrow::TSerializedBatch>> TBuildSlicesTask:: } bool TBuildSlicesTask::DoExecute() { + WriteData.MutableWriteMeta().SetWriteMiddle2StartInstant(TMonotonic::Now()); auto batches = BuildSlices(); + WriteData.MutableWriteMeta().SetWriteMiddle3StartInstant(TMonotonic::Now()); if (batches) { auto writeController = std::make_shared<NOlap::TIndexedWriteController>(ParentActorId, WriteData, Action, std::move(*batches)); if (batches && Action->NeedDraftTransaction()) { diff --git a/ydb/core/tx/columnshard/operations/write_data.cpp b/ydb/core/tx/columnshard/operations/write_data.cpp index 18e1d4e7bd1..dcbf58c9a28 100644 --- a/ydb/core/tx/columnshard/operations/write_data.cpp +++ b/ydb/core/tx/columnshard/operations/write_data.cpp @@ -13,11 +13,15 @@ bool TArrowData::Parse(const NKikimrDataEvents::TOperationData& proto, const IPa columns.emplace_back(columnId); } BatchSchema = std::make_shared<NOlap::TFilteredSnapshotSchema>(IndexSchema, columns); + OriginalDataSize = IncomingData.size(); return BatchSchema->GetColumnsCount() == columns.size() && !IncomingData.empty(); } -std::shared_ptr<arrow::RecordBatch> TArrowData::GetArrowBatch() const { - return IndexSchema->PrepareForInsert(IncomingData, BatchSchema->GetSchema()); +std::shared_ptr<arrow::RecordBatch> TArrowData::ExtractBatch() { + Y_VERIFY(!!IncomingData); + auto result = IndexSchema->PrepareForInsert(IncomingData, BatchSchema->GetSchema()); + IncomingData = ""; + return result; } ui64 TArrowData::GetSchemaVersion() const { @@ -37,11 +41,15 @@ bool TProtoArrowData::ParseFromProto(const NKikimrTxColumnShard::TEvWrite& proto return false; } } + OriginalDataSize = IncomingData.size(); return !IncomingData.empty() && IncomingData.size() <= NColumnShard::TLimits::GetMaxBlobSize(); } -std::shared_ptr<arrow::RecordBatch> TProtoArrowData::GetArrowBatch() const { - return IndexSchema->PrepareForInsert(IncomingData, ArrowSchema); +std::shared_ptr<arrow::RecordBatch> TProtoArrowData::ExtractBatch() { + Y_VERIFY(!!IncomingData); + auto result = IndexSchema->PrepareForInsert(IncomingData, ArrowSchema); + IncomingData = ""; + return result; } ui64 TProtoArrowData::GetSchemaVersion() const { diff --git a/ydb/core/tx/columnshard/operations/write_data.h b/ydb/core/tx/columnshard/operations/write_data.h index bf8c79eb34f..2604fdfb4c2 100644 --- a/ydb/core/tx/columnshard/operations/write_data.h +++ b/ydb/core/tx/columnshard/operations/write_data.h @@ -39,18 +39,20 @@ public: }; class TArrowData : public NEvWrite::IDataContainer { +private: + std::optional<ui64> OriginalDataSize; public: TArrowData(const NOlap::ISnapshotSchema::TPtr& schema) : IndexSchema(schema) {} - const TString& GetData() const override { - return IncomingData; - } - bool Parse(const NKikimrDataEvents::TOperationData& proto, const IPayloadData& payload); - std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const override; + virtual std::shared_ptr<arrow::RecordBatch> ExtractBatch() override; ui64 GetSchemaVersion() const override; + ui64 GetSize() const override { + Y_VERIFY(OriginalDataSize); + return *OriginalDataSize; + } private: NOlap::ISnapshotSchema::TPtr IndexSchema; @@ -59,18 +61,20 @@ private: }; class TProtoArrowData : public NEvWrite::IDataContainer { +private: + std::optional<ui64> OriginalDataSize; public: TProtoArrowData(const NOlap::ISnapshotSchema::TPtr& schema) : IndexSchema(schema) {} - const TString& GetData() const override { - return IncomingData; - } - bool ParseFromProto(const NKikimrTxColumnShard::TEvWrite& proto); - std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const override; + virtual std::shared_ptr<arrow::RecordBatch> ExtractBatch() override; ui64 GetSchemaVersion() const override; + ui64 GetSize() const override { + Y_VERIFY(OriginalDataSize); + return *OriginalDataSize; + } private: NOlap::ISnapshotSchema::TPtr IndexSchema; diff --git a/ydb/core/tx/ev_write/write_data.cpp b/ydb/core/tx/ev_write/write_data.cpp index feb1e61e5fa..6adf7ab4641 100644 --- a/ydb/core/tx/ev_write/write_data.cpp +++ b/ydb/core/tx/ev_write/write_data.cpp @@ -1,6 +1,7 @@ #include "write_data.h" #include <ydb/core/scheme/scheme_types_proto.h> #include <ydb/core/tx/columnshard/defs.h> +#include <library/cpp/actors/core/log.h> namespace NKikimr::NEvWrite { @@ -12,4 +13,9 @@ TWriteData::TWriteData(const TWriteMeta& writeMeta, IDataContainer::TPtr data) Y_VERIFY(Data); } +const NKikimr::NEvWrite::IDataContainer& TWriteData::GetData() const { + AFL_VERIFY(Data); + return *Data; +} + } diff --git a/ydb/core/tx/ev_write/write_data.h b/ydb/core/tx/ev_write/write_data.h index 81d1d673b2b..1b6b7fe1594 100644 --- a/ydb/core/tx/ev_write/write_data.h +++ b/ydb/core/tx/ev_write/write_data.h @@ -12,9 +12,9 @@ class IDataContainer { public: using TPtr = std::shared_ptr<IDataContainer>; virtual ~IDataContainer() {} - virtual std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const = 0; - virtual const TString& GetData() const = 0; + virtual std::shared_ptr<arrow::RecordBatch> ExtractBatch() = 0; virtual ui64 GetSchemaVersion() const = 0; + virtual ui64 GetSize() const = 0; }; class TWriteMeta { @@ -42,16 +42,15 @@ public: }; class TWriteData { +private: TWriteMeta WriteMeta; IDataContainer::TPtr Data; public: TWriteData(const TWriteMeta& writeMeta, IDataContainer::TPtr data); - const IDataContainer& GetData() const { - return *Data; - } + const IDataContainer& GetData() const; - const IDataContainer::TPtr GetDataPtr() const { + const IDataContainer::TPtr& GetDataPtr() const { return Data; } @@ -64,7 +63,7 @@ public: } ui64 GetSize() const { - return Data->GetData().size(); + return Data->GetSize(); } }; |