aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornsofya <nsofya@yandex-team.com>2023-07-28 07:26:08 +0300
committernsofya <nsofya@yandex-team.com>2023-07-28 07:26:08 +0300
commitef61acdadf82081f143c7d7611204191f1285e33 (patch)
treec82fb84ee3eb46c262edf44f1464e7e0f0065bd5
parent10e54d8a4d3cb64929101a629dde889e91253a8c (diff)
downloadydb-ef61acdadf82081f143c7d7611204191f1285e33.tar.gz
KIKIMR-18343: Split IDataContainer iface
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.h2
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp16
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h3
-rw-r--r--ydb/core/tx/columnshard/operations/write_data.cpp16
-rw-r--r--ydb/core/tx/columnshard/operations/write_data.h3
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp10
-rw-r--r--ydb/core/tx/ev_write/events.h10
-rw-r--r--ydb/core/tx/ev_write/write_data.h3
9 files changed, 25 insertions, 40 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index c61206b037..f2a4a91145 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -48,7 +48,7 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
NIceDb::TNiceDb db(txc.DB);
auto writeId = writeMeta.GetWriteId();
- const TString& data = blobData.GetBlobData();
+ const TString data = blobData.GetBlobData();
NKikimrTxColumnShard::TLogicalMetadata meta;
Y_VERIFY(meta.ParseFromString(blobData.GetLogicalMeta()));
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h
index 264fc89dee..10a2aff00c 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.h
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.h
@@ -469,7 +469,7 @@ namespace NKikimr::NColumnShard {
Y_VERIFY(Index < Owner.Builders.size());
auto& builder = Owner.Builders[Index];
auto type = builder->type();
-
+
NArrow::SwitchType(type->id(), [&](const auto& t) {
using TWrap = std::decay_t<decltype(t)>;
using T = typename TWrap::T;
diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp
index 70aee6e509..edeb20565b 100644
--- a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp
@@ -42,15 +42,15 @@ std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::NormalizeBatch(const ISnaps
return arrow::RecordBatch::Make(resultArrowSchema, batch->num_rows(), newColumns);
}
-std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::PrepareForInsert(const TString& data, const std::shared_ptr<arrow::Schema>& dataSchema, TString& strError) const {
+std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::PrepareForInsert(const TString& data, const std::shared_ptr<arrow::Schema>& dataSchema) const {
std::shared_ptr<arrow::Schema> dstSchema = GetIndexInfo().ArrowSchema();
auto batch = NArrow::DeserializeBatch(data, (dataSchema ? dataSchema : dstSchema));
if (!batch) {
- strError = "DeserializeBatch() failed";
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "DeserializeBatch() failed");
return nullptr;
}
if (batch->num_rows() == 0) {
- strError = "empty batch";
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "empty batch");
return nullptr;
}
@@ -58,13 +58,13 @@ std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::PrepareForInsert(const TStr
if (dataSchema) {
batch = NArrow::ExtractColumns(batch, dstSchema, true);
if (!batch) {
- strError = "cannot correct schema";
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "cannot correct schema");
return nullptr;
}
}
if (!batch->schema()->Equals(dstSchema)) {
- strError = "unexpected schema for insert batch: '" + batch->schema()->ToString() + "'";
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", TStringBuilder() << "unexpected schema for insert batch: '" << batch->schema()->ToString() << "'");
return nullptr;
}
@@ -75,18 +75,18 @@ std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::PrepareForInsert(const TStr
for (auto& field : sortingKey->fields()) {
auto column = batch->GetColumnByName(field->name());
if (!column) {
- strError = "missing PK column '" + field->name() + "'";
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", TStringBuilder() << "missing PK column '" << field->name() << "'");
return nullptr;
}
if (NArrow::HasNulls(column)) {
- strError = "PK column '" + field->name() + "' contains NULLs";
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", TStringBuilder() << "PK column '" << field->name() << "' contains NULLs");
return nullptr;
}
}
auto status = batch->ValidateFull();
if (!status.ok()) {
- strError = status.ToString();
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", status.ToString());
return nullptr;
}
batch = NArrow::SortBatch(batch, sortingKey);
diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h
index a485cca697..06fd516712 100644
--- a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h
+++ b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h
@@ -36,13 +36,14 @@ public:
virtual int GetFieldIndex(const ui32 columnId) const = 0;
std::shared_ptr<arrow::Field> GetFieldByIndex(const int index) const;
std::shared_ptr<arrow::Field> GetFieldByColumnId(const ui32 columnId) const;
+
virtual const std::shared_ptr<arrow::Schema>& GetSchema() const = 0;
virtual const TIndexInfo& GetIndexInfo() const = 0;
virtual const TSnapshot& GetSnapshot() const = 0;
virtual ui32 GetColumnsCount() const = 0;
std::shared_ptr<arrow::RecordBatch> NormalizeBatch(const ISnapshotSchema& dataSchema, const std::shared_ptr<arrow::RecordBatch> batch) const;
- std::shared_ptr<arrow::RecordBatch> PrepareForInsert(const TString& data, const std::shared_ptr<arrow::Schema>& dataSchema, TString& strError) const;
+ std::shared_ptr<arrow::RecordBatch> PrepareForInsert(const TString& data, const std::shared_ptr<arrow::Schema>& dataSchema) const;
};
} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/operations/write_data.cpp b/ydb/core/tx/columnshard/operations/write_data.cpp
index 38e386f6cc..27f022590a 100644
--- a/ydb/core/tx/columnshard/operations/write_data.cpp
+++ b/ydb/core/tx/columnshard/operations/write_data.cpp
@@ -5,11 +5,6 @@
namespace NKikimr::NColumnShard {
-void TArrowData::Serialize(NKikimrDataEvents::TOperationData& proto) const {
- Y_FAIL("Not implemented");
- Y_UNUSED(proto);
-}
-
bool TArrowData::Parse(const NKikimrDataEvents::TOperationData& proto, const IPayloadData& payload) {
IncomingData = payload.GetDataFromPayload(proto.GetArrowData().GetPayloadIndex());
@@ -22,13 +17,7 @@ bool TArrowData::Parse(const NKikimrDataEvents::TOperationData& proto, const IPa
}
std::shared_ptr<arrow::RecordBatch> TArrowData::GetArrowBatch() const {
- TString err;
- return IndexSchema->PrepareForInsert(IncomingData, BatchSchema->GetSchema(), err);
-}
-
-void TProtoArrowData::Serialize(NKikimrDataEvents::TOperationData& proto) const {
- Y_FAIL("Not implemented");
- Y_UNUSED(proto);
+ return IndexSchema->PrepareForInsert(IncomingData, BatchSchema->GetSchema());
}
bool TProtoArrowData::ParseFromProto(const NKikimrTxColumnShard::TEvWrite& proto) {
@@ -48,8 +37,7 @@ bool TProtoArrowData::ParseFromProto(const NKikimrTxColumnShard::TEvWrite& proto
}
std::shared_ptr<arrow::RecordBatch> TProtoArrowData::GetArrowBatch() const {
- TString err;
- return IndexSchema->PrepareForInsert(IncomingData, ArrowSchema, err);
+ return IndexSchema->PrepareForInsert(IncomingData, ArrowSchema);
}
}
diff --git a/ydb/core/tx/columnshard/operations/write_data.h b/ydb/core/tx/columnshard/operations/write_data.h
index b626c7358d..8bb52f1e66 100644
--- a/ydb/core/tx/columnshard/operations/write_data.h
+++ b/ydb/core/tx/columnshard/operations/write_data.h
@@ -4,6 +4,7 @@
#include <ydb/core/tx/columnshard/common/snapshot.h>
#include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h>
#include <ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h>
+#include <ydb/core/protos/ev_write.pb.h>
namespace NKikimr::NColumnShard {
@@ -49,7 +50,6 @@ public:
bool Parse(const NKikimrDataEvents::TOperationData& proto, const IPayloadData& payload);
std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const override;
- void Serialize(NKikimrDataEvents::TOperationData& proto) const override;
private:
NOlap::ISnapshotSchema::TPtr IndexSchema;
@@ -69,7 +69,6 @@ public:
bool ParseFromProto(const NKikimrTxColumnShard::TEvWrite& proto);
std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const override;
- void Serialize(NKikimrDataEvents::TOperationData& proto) const override;
private:
NOlap::ISnapshotSchema::TPtr IndexSchema;
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
index 325d3c015a..14a5cceeca 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
@@ -1840,7 +1840,7 @@ void TestReadAggregate(const std::vector<std::pair<TString, TTypeInfo>>& ydbSche
}
Y_UNIT_TEST_SUITE(EvWrite) {
- class TArrowData : public NKikimr::NEvWrite::IDataContainer {
+ class TArrowData : public NKikimr::NEvents::IDataConstructor {
std::vector<std::pair<TString, TTypeInfo>> YdbSchema;
ui64 Index;
@@ -1857,14 +1857,6 @@ Y_UNIT_TEST_SUITE(EvWrite) {
}
proto.MutableArrowData()->SetPayloadIndex(Index);
}
-
- std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const override {
- return nullptr;
- }
-
- const TString& GetData() const override {
- return Default<TString>();
- }
};
Y_UNIT_TEST(WriteInTransaction) {
diff --git a/ydb/core/tx/ev_write/events.h b/ydb/core/tx/ev_write/events.h
index 196fdb6fab..2ddd53c2b3 100644
--- a/ydb/core/tx/ev_write/events.h
+++ b/ydb/core/tx/ev_write/events.h
@@ -11,6 +11,14 @@
namespace NKikimr::NEvents {
+
+class IDataConstructor {
+public:
+ using TPtr = std::shared_ptr<IDataConstructor>;
+ virtual ~IDataConstructor() {}
+ virtual void Serialize(NKikimrDataEvents::TOperationData& proto) const = 0;
+};
+
struct TDataEvents {
class TCoordinatorInfo {
@@ -42,7 +50,7 @@ struct TDataEvents {
Record.SetTxId(txId);
}
- void AddReplaceOp(const ui64 tableId, const NEvWrite::IDataContainer::TPtr& data) {
+ void AddReplaceOp(const ui64 tableId, const IDataConstructor::TPtr& data) {
Record.MutableTableId()->SetTableId(tableId);
Y_VERIFY(data);
data->Serialize(*Record.MutableReplace());
diff --git a/ydb/core/tx/ev_write/write_data.h b/ydb/core/tx/ev_write/write_data.h
index 8f499ad39d..2b9965ceb4 100644
--- a/ydb/core/tx/ev_write/write_data.h
+++ b/ydb/core/tx/ev_write/write_data.h
@@ -3,8 +3,6 @@
#include <ydb/core/tx/long_tx_service/public/types.h>
#include <ydb/core/formats/arrow/arrow_helpers.h>
-#include <ydb/core/protos/tx_columnshard.pb.h>
-#include <ydb/core/protos/ev_write.pb.h>
namespace NKikimr::NEvWrite {
@@ -13,7 +11,6 @@ class IDataContainer {
public:
using TPtr = std::shared_ptr<IDataContainer>;
virtual ~IDataContainer() {}
- virtual void Serialize(NKikimrDataEvents::TOperationData& proto) const = 0;
virtual std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const = 0;
virtual const TString& GetData() const = 0;
};