diff options
author | nsofya <nsofya@yandex-team.com> | 2023-07-28 07:26:08 +0300 |
---|---|---|
committer | nsofya <nsofya@yandex-team.com> | 2023-07-28 07:26:08 +0300 |
commit | ef61acdadf82081f143c7d7611204191f1285e33 (patch) | |
tree | c82fb84ee3eb46c262edf44f1464e7e0f0065bd5 | |
parent | 10e54d8a4d3cb64929101a629dde889e91253a8c (diff) | |
download | ydb-ef61acdadf82081f143c7d7611204191f1285e33.tar.gz |
KIKIMR-18343: Split IDataContainer iface
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__write.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_ut_common.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp | 16 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h | 3 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/operations/write_data.cpp | 16 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/operations/write_data.h | 3 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp | 10 | ||||
-rw-r--r-- | ydb/core/tx/ev_write/events.h | 10 | ||||
-rw-r--r-- | ydb/core/tx/ev_write/write_data.h | 3 |
9 files changed, 25 insertions, 40 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index c61206b037..f2a4a91145 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -48,7 +48,7 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { NIceDb::TNiceDb db(txc.DB); auto writeId = writeMeta.GetWriteId(); - const TString& data = blobData.GetBlobData(); + const TString data = blobData.GetBlobData(); NKikimrTxColumnShard::TLogicalMetadata meta; Y_VERIFY(meta.ParseFromString(blobData.GetLogicalMeta())); diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h index 264fc89dee..10a2aff00c 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.h +++ b/ydb/core/tx/columnshard/columnshard_ut_common.h @@ -469,7 +469,7 @@ namespace NKikimr::NColumnShard { Y_VERIFY(Index < Owner.Builders.size()); auto& builder = Owner.Builders[Index]; auto type = builder->type(); - + NArrow::SwitchType(type->id(), [&](const auto& t) { using TWrap = std::decay_t<decltype(t)>; using T = typename TWrap::T; diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp index 70aee6e509..edeb20565b 100644 --- a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp @@ -42,15 +42,15 @@ std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::NormalizeBatch(const ISnaps return arrow::RecordBatch::Make(resultArrowSchema, batch->num_rows(), newColumns); } -std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::PrepareForInsert(const TString& data, const std::shared_ptr<arrow::Schema>& dataSchema, TString& strError) const { +std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::PrepareForInsert(const TString& data, const std::shared_ptr<arrow::Schema>& dataSchema) const { std::shared_ptr<arrow::Schema> dstSchema = GetIndexInfo().ArrowSchema(); auto batch = NArrow::DeserializeBatch(data, (dataSchema ? dataSchema : dstSchema)); if (!batch) { - strError = "DeserializeBatch() failed"; + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "DeserializeBatch() failed"); return nullptr; } if (batch->num_rows() == 0) { - strError = "empty batch"; + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "empty batch"); return nullptr; } @@ -58,13 +58,13 @@ std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::PrepareForInsert(const TStr if (dataSchema) { batch = NArrow::ExtractColumns(batch, dstSchema, true); if (!batch) { - strError = "cannot correct schema"; + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "cannot correct schema"); return nullptr; } } if (!batch->schema()->Equals(dstSchema)) { - strError = "unexpected schema for insert batch: '" + batch->schema()->ToString() + "'"; + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", TStringBuilder() << "unexpected schema for insert batch: '" << batch->schema()->ToString() << "'"); return nullptr; } @@ -75,18 +75,18 @@ std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::PrepareForInsert(const TStr for (auto& field : sortingKey->fields()) { auto column = batch->GetColumnByName(field->name()); if (!column) { - strError = "missing PK column '" + field->name() + "'"; + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", TStringBuilder() << "missing PK column '" << field->name() << "'"); return nullptr; } if (NArrow::HasNulls(column)) { - strError = "PK column '" + field->name() + "' contains NULLs"; + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", TStringBuilder() << "PK column '" << field->name() << "' contains NULLs"); return nullptr; } } auto status = batch->ValidateFull(); if (!status.ok()) { - strError = status.ToString(); + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", status.ToString()); return nullptr; } batch = NArrow::SortBatch(batch, sortingKey); diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h index a485cca697..06fd516712 100644 --- a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h +++ b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h @@ -36,13 +36,14 @@ public: virtual int GetFieldIndex(const ui32 columnId) const = 0; std::shared_ptr<arrow::Field> GetFieldByIndex(const int index) const; std::shared_ptr<arrow::Field> GetFieldByColumnId(const ui32 columnId) const; + virtual const std::shared_ptr<arrow::Schema>& GetSchema() const = 0; virtual const TIndexInfo& GetIndexInfo() const = 0; virtual const TSnapshot& GetSnapshot() const = 0; virtual ui32 GetColumnsCount() const = 0; std::shared_ptr<arrow::RecordBatch> NormalizeBatch(const ISnapshotSchema& dataSchema, const std::shared_ptr<arrow::RecordBatch> batch) const; - std::shared_ptr<arrow::RecordBatch> PrepareForInsert(const TString& data, const std::shared_ptr<arrow::Schema>& dataSchema, TString& strError) const; + std::shared_ptr<arrow::RecordBatch> PrepareForInsert(const TString& data, const std::shared_ptr<arrow::Schema>& dataSchema) const; }; } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/operations/write_data.cpp b/ydb/core/tx/columnshard/operations/write_data.cpp index 38e386f6cc..27f022590a 100644 --- a/ydb/core/tx/columnshard/operations/write_data.cpp +++ b/ydb/core/tx/columnshard/operations/write_data.cpp @@ -5,11 +5,6 @@ namespace NKikimr::NColumnShard { -void TArrowData::Serialize(NKikimrDataEvents::TOperationData& proto) const { - Y_FAIL("Not implemented"); - Y_UNUSED(proto); -} - bool TArrowData::Parse(const NKikimrDataEvents::TOperationData& proto, const IPayloadData& payload) { IncomingData = payload.GetDataFromPayload(proto.GetArrowData().GetPayloadIndex()); @@ -22,13 +17,7 @@ bool TArrowData::Parse(const NKikimrDataEvents::TOperationData& proto, const IPa } std::shared_ptr<arrow::RecordBatch> TArrowData::GetArrowBatch() const { - TString err; - return IndexSchema->PrepareForInsert(IncomingData, BatchSchema->GetSchema(), err); -} - -void TProtoArrowData::Serialize(NKikimrDataEvents::TOperationData& proto) const { - Y_FAIL("Not implemented"); - Y_UNUSED(proto); + return IndexSchema->PrepareForInsert(IncomingData, BatchSchema->GetSchema()); } bool TProtoArrowData::ParseFromProto(const NKikimrTxColumnShard::TEvWrite& proto) { @@ -48,8 +37,7 @@ bool TProtoArrowData::ParseFromProto(const NKikimrTxColumnShard::TEvWrite& proto } std::shared_ptr<arrow::RecordBatch> TProtoArrowData::GetArrowBatch() const { - TString err; - return IndexSchema->PrepareForInsert(IncomingData, ArrowSchema, err); + return IndexSchema->PrepareForInsert(IncomingData, ArrowSchema); } } diff --git a/ydb/core/tx/columnshard/operations/write_data.h b/ydb/core/tx/columnshard/operations/write_data.h index b626c7358d..8bb52f1e66 100644 --- a/ydb/core/tx/columnshard/operations/write_data.h +++ b/ydb/core/tx/columnshard/operations/write_data.h @@ -4,6 +4,7 @@ #include <ydb/core/tx/columnshard/common/snapshot.h> #include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h> #include <ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h> +#include <ydb/core/protos/ev_write.pb.h> namespace NKikimr::NColumnShard { @@ -49,7 +50,6 @@ public: bool Parse(const NKikimrDataEvents::TOperationData& proto, const IPayloadData& payload); std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const override; - void Serialize(NKikimrDataEvents::TOperationData& proto) const override; private: NOlap::ISnapshotSchema::TPtr IndexSchema; @@ -69,7 +69,6 @@ public: bool ParseFromProto(const NKikimrTxColumnShard::TEvWrite& proto); std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const override; - void Serialize(NKikimrDataEvents::TOperationData& proto) const override; private: NOlap::ISnapshotSchema::TPtr IndexSchema; diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp index 325d3c015a..14a5cceeca 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp @@ -1840,7 +1840,7 @@ void TestReadAggregate(const std::vector<std::pair<TString, TTypeInfo>>& ydbSche } Y_UNIT_TEST_SUITE(EvWrite) { - class TArrowData : public NKikimr::NEvWrite::IDataContainer { + class TArrowData : public NKikimr::NEvents::IDataConstructor { std::vector<std::pair<TString, TTypeInfo>> YdbSchema; ui64 Index; @@ -1857,14 +1857,6 @@ Y_UNIT_TEST_SUITE(EvWrite) { } proto.MutableArrowData()->SetPayloadIndex(Index); } - - std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const override { - return nullptr; - } - - const TString& GetData() const override { - return Default<TString>(); - } }; Y_UNIT_TEST(WriteInTransaction) { diff --git a/ydb/core/tx/ev_write/events.h b/ydb/core/tx/ev_write/events.h index 196fdb6fab..2ddd53c2b3 100644 --- a/ydb/core/tx/ev_write/events.h +++ b/ydb/core/tx/ev_write/events.h @@ -11,6 +11,14 @@ namespace NKikimr::NEvents { + +class IDataConstructor { +public: + using TPtr = std::shared_ptr<IDataConstructor>; + virtual ~IDataConstructor() {} + virtual void Serialize(NKikimrDataEvents::TOperationData& proto) const = 0; +}; + struct TDataEvents { class TCoordinatorInfo { @@ -42,7 +50,7 @@ struct TDataEvents { Record.SetTxId(txId); } - void AddReplaceOp(const ui64 tableId, const NEvWrite::IDataContainer::TPtr& data) { + void AddReplaceOp(const ui64 tableId, const IDataConstructor::TPtr& data) { Record.MutableTableId()->SetTableId(tableId); Y_VERIFY(data); data->Serialize(*Record.MutableReplace()); diff --git a/ydb/core/tx/ev_write/write_data.h b/ydb/core/tx/ev_write/write_data.h index 8f499ad39d..2b9965ceb4 100644 --- a/ydb/core/tx/ev_write/write_data.h +++ b/ydb/core/tx/ev_write/write_data.h @@ -3,8 +3,6 @@ #include <ydb/core/tx/long_tx_service/public/types.h> #include <ydb/core/formats/arrow/arrow_helpers.h> -#include <ydb/core/protos/tx_columnshard.pb.h> -#include <ydb/core/protos/ev_write.pb.h> namespace NKikimr::NEvWrite { @@ -13,7 +11,6 @@ class IDataContainer { public: using TPtr = std::shared_ptr<IDataContainer>; virtual ~IDataContainer() {} - virtual void Serialize(NKikimrDataEvents::TOperationData& proto) const = 0; virtual std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const = 0; virtual const TString& GetData() const = 0; }; |