summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <[email protected]>2023-09-29 12:40:25 +0300
committerivanmorozov <[email protected]>2023-09-29 13:15:53 +0300
commitd1b6c34393585eae85b13c71f4ca6125ad908ddd (patch)
tree837bd729c1ad81914707922df586700afaab9c46
parent9f05ad4a7c0f811fd2f7b6de45c1e1e7c3b29903 (diff)
KIKIMR-19211: dont store original blob after unpack
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp2
-rw-r--r--ydb/core/tx/columnshard/operations/slice_builder.cpp4
-rw-r--r--ydb/core/tx/columnshard/operations/write_data.cpp16
-rw-r--r--ydb/core/tx/columnshard/operations/write_data.h24
-rw-r--r--ydb/core/tx/ev_write/write_data.cpp6
-rw-r--r--ydb/core/tx/ev_write/write_data.h13
6 files changed, 42 insertions, 23 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index 017df46d15e..802f6fdbd15 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -215,7 +215,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
return;
}
- auto wg = WritesMonitor.RegisterWrite(arrowData->GetData().size());
+ auto wg = WritesMonitor.RegisterWrite(arrowData->GetSize());
auto operation = OperationsManager.RegisterOperation(txId);
Y_VERIFY(operation);
operation->Start(*this, tableId, arrowData, source, ctx);
diff --git a/ydb/core/tx/columnshard/operations/slice_builder.cpp b/ydb/core/tx/columnshard/operations/slice_builder.cpp
index 6988c621f1a..14792dacbbf 100644
--- a/ydb/core/tx/columnshard/operations/slice_builder.cpp
+++ b/ydb/core/tx/columnshard/operations/slice_builder.cpp
@@ -12,7 +12,7 @@ std::optional<std::vector<NKikimr::NArrow::TSerializedBatch>> TBuildSlicesTask::
const ui64 tableId = writeMeta.GetTableId();
const ui64 writeId = writeMeta.GetWriteId();
- std::shared_ptr<arrow::RecordBatch> batch = WriteData.GetData().GetArrowBatch();
+ std::shared_ptr<arrow::RecordBatch> batch = WriteData.GetDataPtr()->ExtractBatch();
if (!batch) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "ev_write_bad_data")("write_id", writeId)("table_id", tableId);
@@ -34,7 +34,9 @@ std::optional<std::vector<NKikimr::NArrow::TSerializedBatch>> TBuildSlicesTask::
}
bool TBuildSlicesTask::DoExecute() {
+ WriteData.MutableWriteMeta().SetWriteMiddle2StartInstant(TMonotonic::Now());
auto batches = BuildSlices();
+ WriteData.MutableWriteMeta().SetWriteMiddle3StartInstant(TMonotonic::Now());
if (batches) {
auto writeController = std::make_shared<NOlap::TIndexedWriteController>(ParentActorId, WriteData, Action, std::move(*batches));
if (batches && Action->NeedDraftTransaction()) {
diff --git a/ydb/core/tx/columnshard/operations/write_data.cpp b/ydb/core/tx/columnshard/operations/write_data.cpp
index 18e1d4e7bd1..dcbf58c9a28 100644
--- a/ydb/core/tx/columnshard/operations/write_data.cpp
+++ b/ydb/core/tx/columnshard/operations/write_data.cpp
@@ -13,11 +13,15 @@ bool TArrowData::Parse(const NKikimrDataEvents::TOperationData& proto, const IPa
columns.emplace_back(columnId);
}
BatchSchema = std::make_shared<NOlap::TFilteredSnapshotSchema>(IndexSchema, columns);
+ OriginalDataSize = IncomingData.size();
return BatchSchema->GetColumnsCount() == columns.size() && !IncomingData.empty();
}
-std::shared_ptr<arrow::RecordBatch> TArrowData::GetArrowBatch() const {
- return IndexSchema->PrepareForInsert(IncomingData, BatchSchema->GetSchema());
+std::shared_ptr<arrow::RecordBatch> TArrowData::ExtractBatch() {
+ Y_VERIFY(!!IncomingData);
+ auto result = IndexSchema->PrepareForInsert(IncomingData, BatchSchema->GetSchema());
+ IncomingData = "";
+ return result;
}
ui64 TArrowData::GetSchemaVersion() const {
@@ -37,11 +41,15 @@ bool TProtoArrowData::ParseFromProto(const NKikimrTxColumnShard::TEvWrite& proto
return false;
}
}
+ OriginalDataSize = IncomingData.size();
return !IncomingData.empty() && IncomingData.size() <= NColumnShard::TLimits::GetMaxBlobSize();
}
-std::shared_ptr<arrow::RecordBatch> TProtoArrowData::GetArrowBatch() const {
- return IndexSchema->PrepareForInsert(IncomingData, ArrowSchema);
+std::shared_ptr<arrow::RecordBatch> TProtoArrowData::ExtractBatch() {
+ Y_VERIFY(!!IncomingData);
+ auto result = IndexSchema->PrepareForInsert(IncomingData, ArrowSchema);
+ IncomingData = "";
+ return result;
}
ui64 TProtoArrowData::GetSchemaVersion() const {
diff --git a/ydb/core/tx/columnshard/operations/write_data.h b/ydb/core/tx/columnshard/operations/write_data.h
index bf8c79eb34f..2604fdfb4c2 100644
--- a/ydb/core/tx/columnshard/operations/write_data.h
+++ b/ydb/core/tx/columnshard/operations/write_data.h
@@ -39,18 +39,20 @@ public:
};
class TArrowData : public NEvWrite::IDataContainer {
+private:
+ std::optional<ui64> OriginalDataSize;
public:
TArrowData(const NOlap::ISnapshotSchema::TPtr& schema)
: IndexSchema(schema)
{}
- const TString& GetData() const override {
- return IncomingData;
- }
-
bool Parse(const NKikimrDataEvents::TOperationData& proto, const IPayloadData& payload);
- std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const override;
+ virtual std::shared_ptr<arrow::RecordBatch> ExtractBatch() override;
ui64 GetSchemaVersion() const override;
+ ui64 GetSize() const override {
+ Y_VERIFY(OriginalDataSize);
+ return *OriginalDataSize;
+ }
private:
NOlap::ISnapshotSchema::TPtr IndexSchema;
@@ -59,18 +61,20 @@ private:
};
class TProtoArrowData : public NEvWrite::IDataContainer {
+private:
+ std::optional<ui64> OriginalDataSize;
public:
TProtoArrowData(const NOlap::ISnapshotSchema::TPtr& schema)
: IndexSchema(schema)
{}
- const TString& GetData() const override {
- return IncomingData;
- }
-
bool ParseFromProto(const NKikimrTxColumnShard::TEvWrite& proto);
- std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const override;
+ virtual std::shared_ptr<arrow::RecordBatch> ExtractBatch() override;
ui64 GetSchemaVersion() const override;
+ ui64 GetSize() const override {
+ Y_VERIFY(OriginalDataSize);
+ return *OriginalDataSize;
+ }
private:
NOlap::ISnapshotSchema::TPtr IndexSchema;
diff --git a/ydb/core/tx/ev_write/write_data.cpp b/ydb/core/tx/ev_write/write_data.cpp
index feb1e61e5fa..6adf7ab4641 100644
--- a/ydb/core/tx/ev_write/write_data.cpp
+++ b/ydb/core/tx/ev_write/write_data.cpp
@@ -1,6 +1,7 @@
#include "write_data.h"
#include <ydb/core/scheme/scheme_types_proto.h>
#include <ydb/core/tx/columnshard/defs.h>
+#include <library/cpp/actors/core/log.h>
namespace NKikimr::NEvWrite {
@@ -12,4 +13,9 @@ TWriteData::TWriteData(const TWriteMeta& writeMeta, IDataContainer::TPtr data)
Y_VERIFY(Data);
}
+const NKikimr::NEvWrite::IDataContainer& TWriteData::GetData() const {
+ AFL_VERIFY(Data);
+ return *Data;
+}
+
}
diff --git a/ydb/core/tx/ev_write/write_data.h b/ydb/core/tx/ev_write/write_data.h
index 81d1d673b2b..1b6b7fe1594 100644
--- a/ydb/core/tx/ev_write/write_data.h
+++ b/ydb/core/tx/ev_write/write_data.h
@@ -12,9 +12,9 @@ class IDataContainer {
public:
using TPtr = std::shared_ptr<IDataContainer>;
virtual ~IDataContainer() {}
- virtual std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const = 0;
- virtual const TString& GetData() const = 0;
+ virtual std::shared_ptr<arrow::RecordBatch> ExtractBatch() = 0;
virtual ui64 GetSchemaVersion() const = 0;
+ virtual ui64 GetSize() const = 0;
};
class TWriteMeta {
@@ -42,16 +42,15 @@ public:
};
class TWriteData {
+private:
TWriteMeta WriteMeta;
IDataContainer::TPtr Data;
public:
TWriteData(const TWriteMeta& writeMeta, IDataContainer::TPtr data);
- const IDataContainer& GetData() const {
- return *Data;
- }
+ const IDataContainer& GetData() const;
- const IDataContainer::TPtr GetDataPtr() const {
+ const IDataContainer::TPtr& GetDataPtr() const {
return Data;
}
@@ -64,7 +63,7 @@ public:
}
ui64 GetSize() const {
- return Data->GetData().size();
+ return Data->GetSize();
}
};