diff options
author | Artem Zuikov <[email protected]> | 2022-06-09 14:39:36 +0300 |
---|---|---|
committer | Artem Zuikov <[email protected]> | 2022-06-09 14:39:36 +0300 |
commit | 0e7645a1547e5ac0e618721fa207c35ee444c247 (patch) | |
tree | ea6ac050f3da3cacf18135fb62f4a7c452e25f13 | |
parent | c04b663c7bb4b750deeb8f48f620497ec13da8fa (diff) |
KIKIMR-15046: fix upsert JsonDocument via CSV or Arrow
ref:1b132310111e00aeeae70bb01445128b1e515d43
-rw-r--r-- | CMakeLists.darwin.txt | 2 | ||||
-rw-r--r-- | CMakeLists.linux.txt | 2 | ||||
-rw-r--r-- | ydb/core/formats/CMakeLists.txt | 5 | ||||
-rw-r--r-- | ydb/core/formats/arrow_helpers.cpp | 120 | ||||
-rw-r--r-- | ydb/core/formats/arrow_helpers.h | 15 | ||||
-rw-r--r-- | ydb/core/formats/ut/CMakeLists.darwin.txt | 3 | ||||
-rw-r--r-- | ydb/core/formats/ut/CMakeLists.linux.txt | 3 | ||||
-rw-r--r-- | ydb/core/formats/ut_arrow.cpp | 72 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/CMakeLists.txt | 2 | ||||
-rw-r--r-- | ydb/core/tx/tx_proxy/upload_rows_common_impl.h | 37 | ||||
-rw-r--r-- | ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp | 101 | ||||
-rw-r--r-- | ydb/services/ydb/ydb_common_ut.h | 7 |
12 files changed, 300 insertions, 69 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt index a98538a6d61..1b2cf3f02c1 100644 --- a/CMakeLists.darwin.txt +++ b/CMakeLists.darwin.txt @@ -789,6 +789,7 @@ add_subdirectory(library/cpp/containers/ring_buffer) add_subdirectory(ydb/core/security) add_subdirectory(ydb/core/tx/columnshard) add_subdirectory(ydb/core/tx/columnshard/engines) +add_subdirectory(ydb/library/yql/public/udf/service/exception_policy) add_subdirectory(ydb/core/tx/coordinator) add_subdirectory(ydb/core/tx/long_tx_service) add_subdirectory(ydb/core/tx/mediator) @@ -827,7 +828,6 @@ add_subdirectory(ydb/library/yql/providers/solomon/expr_nodes) add_subdirectory(ydb/library/yql/providers/ydb/actors) add_subdirectory(ydb/library/yql/providers/ydb/comp_nodes) add_subdirectory(ydb/core/yq/libs/logs) -add_subdirectory(ydb/library/yql/public/udf/service/exception_policy) add_subdirectory(ydb/services/auth) add_subdirectory(ydb/services/cms) add_subdirectory(ydb/services/discovery) diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt index a9e3cd5b372..7ad724d2a11 100644 --- a/CMakeLists.linux.txt +++ b/CMakeLists.linux.txt @@ -869,6 +869,7 @@ add_subdirectory(library/cpp/containers/ring_buffer) add_subdirectory(ydb/core/security) add_subdirectory(ydb/core/tx/columnshard) add_subdirectory(ydb/core/tx/columnshard/engines) +add_subdirectory(ydb/library/yql/public/udf/service/exception_policy) add_subdirectory(ydb/core/tx/coordinator) add_subdirectory(ydb/core/tx/long_tx_service) add_subdirectory(ydb/core/tx/mediator) @@ -925,7 +926,6 @@ add_subdirectory(ydb/library/yql/providers/solomon/expr_nodes) add_subdirectory(ydb/library/yql/providers/ydb/actors) add_subdirectory(ydb/library/yql/providers/ydb/comp_nodes) add_subdirectory(ydb/core/yq/libs/logs) -add_subdirectory(ydb/library/yql/public/udf/service/exception_policy) add_subdirectory(ydb/services/auth) add_subdirectory(ydb/services/cms) add_subdirectory(ydb/services/discovery) diff --git a/ydb/core/formats/CMakeLists.txt b/ydb/core/formats/CMakeLists.txt index f61a7c54d19..993180c9e4f 100644 --- a/ydb/core/formats/CMakeLists.txt +++ b/ydb/core/formats/CMakeLists.txt @@ -8,11 +8,16 @@ add_library(ydb-core-formats) +target_compile_options(ydb-core-formats PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) target_link_libraries(ydb-core-formats PUBLIC contrib-libs-cxxsupp yutil libs-apache-arrow ydb-core-scheme + ydb-library-binary_json + ydb-library-dynumber ) target_sources(ydb-core-formats PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow_batch_builder.cpp diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp index 3e1e1b0444a..72ff6fd234e 100644 --- a/ydb/core/formats/arrow_helpers.cpp +++ b/ydb/core/formats/arrow_helpers.cpp @@ -2,6 +2,10 @@ #include "switch_type.h" #include "one_batch_input_stream.h" #include "merging_sorted_input_stream.h" + +#include <ydb/library/binary_json/write.h> +#include <ydb/library/dynumber/dynumber.h> +#include <util/memory/pool.h> #include <util/system/yassert.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/io/memory.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/reader.h> @@ -912,6 +916,99 @@ std::shared_ptr<arrow::RecordBatch> SortBatch(const std::shared_ptr<arrow::Recor return Reorder(batch, sortPermutation); } +static bool ConvertData(TCell& cell, const NScheme::TTypeId& colType, TMemoryPool& memPool, TString& errorMessage) { + switch (colType) { + case NScheme::NTypeIds::DyNumber: { + const auto dyNumber = NDyNumber::ParseDyNumberString(cell.AsBuf()); + if (!dyNumber.Defined()) { + errorMessage = "Invalid DyNumber string representation"; + return false; + } + const auto dyNumberInPool = memPool.AppendString(TStringBuf(*dyNumber)); + cell = TCell(dyNumberInPool.data(), dyNumberInPool.size()); + break; + } + case NScheme::NTypeIds::JsonDocument: { + const auto binaryJson = NBinaryJson::SerializeToBinaryJson(cell.AsBuf()); + if (!binaryJson.Defined()) { + errorMessage = "Invalid JSON for JsonDocument provided"; + return false; + } + const auto saved = memPool.AppendString(TStringBuf(binaryJson->Data(), binaryJson->Size())); + cell = TCell(saved.data(), saved.size()); + break; + } + case NScheme::NTypeIds::Decimal: + errorMessage = "Decimal conversion is not supported yet"; + return false; + default: + break; + } + return true; +} + +static std::shared_ptr<arrow::Array> ConvertColumn(const std::shared_ptr<arrow::Array>& column, + NScheme::TTypeId colType) { + if (colType == NScheme::NTypeIds::Decimal) { + return {}; + } + + if (column->type()->id() != arrow::Type::BINARY) { + return {}; + } + + auto& binaryArray = static_cast<arrow::BinaryArray&>(*column); + arrow::BinaryBuilder builder; + builder.Reserve(binaryArray.length()).ok(); + // TODO: ReserveData + + switch (colType) { + case NScheme::NTypeIds::DyNumber: { + for (i32 i = 0; i < binaryArray.length(); ++i) { + auto value = binaryArray.Value(i); + const auto dyNumber = NDyNumber::ParseDyNumberString(TStringBuf(value.data(), value.size())); + if (!dyNumber.Defined() || !builder.Append((*dyNumber).data(), (*dyNumber).size()).ok()) { + return {}; + } + } + } + case NScheme::NTypeIds::JsonDocument: { + for (i32 i = 0; i < binaryArray.length(); ++i) { + auto value = binaryArray.Value(i); + const auto binaryJson = NBinaryJson::SerializeToBinaryJson(TStringBuf(value.data(), value.size())); + if (!binaryJson.Defined() || !builder.Append(binaryJson->Data(), binaryJson->Size()).ok()) { + return {}; + } + } + } + default: + break; + } + + std::shared_ptr<arrow::BinaryArray> result; + if (!builder.Finish(&result).ok()) { + return {}; + } + return result; +} + +std::shared_ptr<arrow::RecordBatch> ConvertColumns(const std::shared_ptr<arrow::RecordBatch>& batch, + const THashMap<TString, NScheme::TTypeId>& columnsToConvert) +{ + std::vector<std::shared_ptr<arrow::Array>> columns = batch->columns(); + for (i32 i = 0; i < batch->num_columns(); ++i) { + auto& colName = batch->column_name(i); + auto it = columnsToConvert.find(TString(colName.data(), colName.size())); + if (it != columnsToConvert.end()) { + columns[i] = ConvertColumn(columns[i], it->second); + if (!columns[i]) { + return {}; + } + } + } + return arrow::RecordBatch::Make(batch->schema(), batch->num_rows(), columns); +} + bool TArrowToYdbConverter::Process(const arrow::RecordBatch& batch, TString& errorMessage) { std::vector<std::shared_ptr<arrow::Array>> allColumns; allColumns.reserve(YdbSchema.size()); @@ -929,6 +1026,8 @@ bool TArrowToYdbConverter::Process(const arrow::RecordBatch& batch, TString& err std::vector<TSmallVec<TCell>> cells; i64 row = 0; + TMemoryPool memPool(256); // for convertions + #if 1 // optimization static constexpr i32 unroll = 32; cells.reserve(unroll); @@ -959,6 +1058,16 @@ bool TArrowToYdbConverter::Process(const arrow::RecordBatch& batch, TString& err << " at column '" << colName << "'"; return false; } + + if (NeedDataConversion(colType)) { + memPool.Clear(); + for (i32 i = 0; i < unroll; ++i) { + if (!ConvertData(cells[i][col], colType, memPool, errorMessage)) { + return false; + } + } + } + ++col; } @@ -973,17 +1082,20 @@ bool TArrowToYdbConverter::Process(const arrow::RecordBatch& batch, TString& err #endif for (; row < batch.num_rows(); ++row) { + memPool.Clear(); + ui32 col = 0; for (auto& [colName, colType] : YdbSchema) { auto& column = allColumns[col]; + auto& curCell = cells[0][col]; if (column->IsNull(row)) { - cells[0][col] = TCell(); + curCell = TCell(); continue; } bool success = SwitchYqlTypeToArrowType(colType, [&]<typename TType>(TTypeWrapper<TType> typeHolder) { Y_UNUSED(typeHolder); - cells[0][col] = MakeCell<typename arrow::TypeTraits<TType>::ArrayType>(column, row); + curCell = MakeCell<typename arrow::TypeTraits<TType>::ArrayType>(column, row); return true; }); @@ -992,6 +1104,10 @@ bool TArrowToYdbConverter::Process(const arrow::RecordBatch& batch, TString& err << " at column '" << colName << "'"; return false; } + + if (!ConvertData(curCell, colType, memPool, errorMessage)) { + return false; + } ++col; } diff --git a/ydb/core/formats/arrow_helpers.h b/ydb/core/formats/arrow_helpers.h index cd3ec9f865f..19d15052700 100644 --- a/ydb/core/formats/arrow_helpers.h +++ b/ydb/core/formats/arrow_helpers.h @@ -181,6 +181,18 @@ private: } public: + static bool NeedDataConversion(const NScheme::TTypeId& colType) { + switch (colType) { + case NScheme::NTypeIds::DyNumber: + case NScheme::NTypeIds::JsonDocument: + case NScheme::NTypeIds::Decimal: + return true; + default: + break; + } + return false; + } + TArrowToYdbConverter(const TVector<std::pair<TString, NScheme::TTypeId>>& ydbSchema, IRowWriter& rowWriter) : YdbSchema(ydbSchema) , RowWriter(rowWriter) @@ -198,6 +210,9 @@ public: virtual void AddRow(const TConstArrayRef<TCell> &cells) = 0; }; +std::shared_ptr<arrow::RecordBatch> ConvertColumns(const std::shared_ptr<arrow::RecordBatch>& batch, + const THashMap<TString, NScheme::TTypeId>& columnsToConvert); + inline bool HasNulls(const std::shared_ptr<arrow::Array>& column) { return column->null_bitmap_data(); } diff --git a/ydb/core/formats/ut/CMakeLists.darwin.txt b/ydb/core/formats/ut/CMakeLists.darwin.txt index 63729f5691e..2f19d1653bc 100644 --- a/ydb/core/formats/ut/CMakeLists.darwin.txt +++ b/ydb/core/formats/ut/CMakeLists.darwin.txt @@ -10,6 +10,7 @@ add_executable(ydb-core-formats-ut) target_compile_options(ydb-core-formats-ut PRIVATE -Wno-unused-parameter + -DUSE_CURRENT_UDF_ABI_VERSION ) target_include_directories(ydb-core-formats-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/formats @@ -22,6 +23,8 @@ target_link_libraries(ydb-core-formats-ut PUBLIC ydb-core-formats libs-apache-arrow ydb-core-base + udf-service-exception_policy + yql-sql-pg_dummy ) target_link_options(ydb-core-formats-ut PRIVATE -Wl,-no_deduplicate diff --git a/ydb/core/formats/ut/CMakeLists.linux.txt b/ydb/core/formats/ut/CMakeLists.linux.txt index 286bb5eb02c..38c63da7034 100644 --- a/ydb/core/formats/ut/CMakeLists.linux.txt +++ b/ydb/core/formats/ut/CMakeLists.linux.txt @@ -10,6 +10,7 @@ add_executable(ydb-core-formats-ut) target_compile_options(ydb-core-formats-ut PRIVATE -Wno-unused-parameter + -DUSE_CURRENT_UDF_ABI_VERSION ) target_include_directories(ydb-core-formats-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/formats @@ -23,6 +24,8 @@ target_link_libraries(ydb-core-formats-ut PUBLIC ydb-core-formats libs-apache-arrow ydb-core-base + udf-service-exception_policy + yql-sql-pg_dummy ) target_link_options(ydb-core-formats-ut PRIVATE -ldl diff --git a/ydb/core/formats/ut_arrow.cpp b/ydb/core/formats/ut_arrow.cpp index 344bbe7cb8e..291300bdd31 100644 --- a/ydb/core/formats/ut_arrow.cpp +++ b/ydb/core/formats/ut_arrow.cpp @@ -2,6 +2,7 @@ #include <ydb/core/formats/arrow_helpers.h> #include <ydb/core/formats/one_batch_input_stream.h> #include <ydb/core/formats/merging_sorted_input_stream.h> +#include <ydb/library/binary_json/write.h> #include <library/cpp/testing/unittest/registar.h> #include <util/string/printf.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/api.h> @@ -35,8 +36,8 @@ struct TDataRow { NTypeIds::Datetime, NTypeIds::Timestamp, NTypeIds::Interval, - NTypeIds::Decimal - // NTypeIds::PairUi64Ui64, NTypeIds::ActorId, NTypeIds::StepOrderId + NTypeIds::JsonDocument, + // TODO: DyNumber, Decimal }; bool Bool; @@ -58,7 +59,8 @@ struct TDataRow { ui32 Datetime; i64 Timestamp; i64 Interval; - ui64 Decimal[2]; + std::string JsonDocument; + //ui64 Decimal[2]; bool operator == (const TDataRow& r) const { return (Bool == r.Bool) && @@ -80,7 +82,8 @@ struct TDataRow { (Datetime == r.Datetime) && (Timestamp == r.Timestamp) && (Interval == r.Interval) && - (Decimal[0] == r.Decimal[0] && Decimal[1] == r.Decimal[1]); + (JsonDocument == r.JsonDocument); + //(Decimal[0] == r.Decimal[0] && Decimal[1] == r.Decimal[1]); } static std::shared_ptr<arrow::Schema> MakeArrowSchema() { @@ -104,7 +107,8 @@ struct TDataRow { arrow::field("datetime", arrow::uint32()), arrow::field("ts", arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO)), arrow::field("ival", arrow::duration(arrow::TimeUnit::TimeUnit::MICRO)), - arrow::field("dec", arrow::decimal(NScheme::DECIMAL_PRECISION, NScheme::DECIMAL_SCALE)), + arrow::field("json_doc", arrow::binary()), + //arrow::field("dec", arrow::decimal(NScheme::DECIMAL_PRECISION, NScheme::DECIMAL_SCALE)), }; return std::make_shared<arrow::Schema>(fields); @@ -131,7 +135,8 @@ struct TDataRow { {"datetime", NTypeIds::Datetime }, {"ts", NTypeIds::Timestamp }, {"ival", NTypeIds::Interval }, - {"dec", NTypeIds::Decimal } + {"json_doc", NTypeIds::JsonDocument }, + //{"dec", NTypeIds::Decimal } }; return columns; } @@ -157,10 +162,22 @@ struct TDataRow { Cells[16] = TCell::Make<ui32>(Datetime); Cells[17] = TCell::Make<i64>(Timestamp); Cells[18] = TCell::Make<i64>(Interval); - Cells[19] = TCell((const char *)&Decimal[0], 16); + Cells[19] = TCell(JsonDocument.data(), JsonDocument.size()); + //Cells[19] = TCell((const char *)&Decimal[0], 16); return NKikimr::TDbTupleRef(Types, Cells, 20); } + + TOwnedCellVec SerializedCells() const { + NKikimr::TDbTupleRef value = ToDbTupleRef(); + TVector<TCell> cells(value.Cells().data(), value.Cells().data() + value.Cells().size()); + + auto binaryJson = NBinaryJson::SerializeToBinaryJson(TStringBuf(JsonDocument.data(), JsonDocument.size())); + UNIT_ASSERT(binaryJson.Defined()); + + cells[19] = TCell(binaryJson->Data(), binaryJson->Size()); + return TOwnedCellVec(cells); + } }; @@ -198,18 +215,19 @@ std::vector<TDataRow> ToVector(const std::shared_ptr<T>& table) { auto arts = std::static_pointer_cast<arrow::TimestampArray>(GetColumn(*table, 17)); auto arival = std::static_pointer_cast<arrow::DurationArray>(GetColumn(*table, 18)); - auto ardec = std::static_pointer_cast<arrow::Decimal128Array>(GetColumn(*table, 19)); + auto arjd = std::static_pointer_cast<arrow::BinaryArray>(GetColumn(*table, 19)); + //auto ardec = std::static_pointer_cast<arrow::Decimal128Array>(GetColumn(*table, 19)); for (int64_t i = 0; i < table->num_rows(); ++i) { - ui64 dec[2]; - memcpy(dec, ardec->Value(i), 16); + //ui64 dec[2]; + //memcpy(dec, ardec->Value(i), 16); TDataRow r{ arbool->Value(i), ari8->Value(i), ari16->Value(i), ari32->Value(i), ari64->Value(i), aru8->Value(i), aru16->Value(i), aru32->Value(i), aru64->Value(i), arf32->Value(i), arf64->Value(i), arstr->GetString(i), arutf->GetString(i), arj->GetString(i), ary->GetString(i), - ard->Value(i), ardt->Value(i), arts->Value(i), arival->Value(i), - {dec[0], dec[1]} + ard->Value(i), ardt->Value(i), arts->Value(i), arival->Value(i), arjd->GetString(i) + //{dec[0], dec[1]} }; rows.emplace_back(std::move(r)); } @@ -223,7 +241,7 @@ public: TDataRowTableBuilder() : Bts(arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO), arrow::default_memory_pool()) , Bival(arrow::duration(arrow::TimeUnit::TimeUnit::MICRO), arrow::default_memory_pool()) - , Bdec(arrow::decimal(NScheme::DECIMAL_PRECISION, NScheme::DECIMAL_SCALE), arrow::default_memory_pool()) + //, Bdec(arrow::decimal(NScheme::DECIMAL_PRECISION, NScheme::DECIMAL_SCALE), arrow::default_memory_pool()) {} void AddRow(const TDataRow& row) { @@ -249,7 +267,8 @@ public: UNIT_ASSERT(Bts.Append(row.Timestamp).ok()); UNIT_ASSERT(Bival.Append(row.Interval).ok()); - UNIT_ASSERT(Bdec.Append((const char *)&row.Decimal).ok()); + UNIT_ASSERT(Bjd.Append(row.JsonDocument).ok()); + //UNIT_ASSERT(Bdec.Append((const char *)&row.Decimal).ok()); } std::shared_ptr<arrow::Table> Finish() { @@ -275,7 +294,8 @@ public: std::shared_ptr<arrow::TimestampArray> arts; std::shared_ptr<arrow::DurationArray> arival; - std::shared_ptr<arrow::Decimal128Array> ardec; + std::shared_ptr<arrow::BinaryArray> arjd; + //std::shared_ptr<arrow::Decimal128Array> ardec; UNIT_ASSERT(Bbool.Finish(&arbool).ok()); UNIT_ASSERT(Bi8.Finish(&ari8).ok()); @@ -299,7 +319,8 @@ public: UNIT_ASSERT(Bts.Finish(&arts).ok()); UNIT_ASSERT(Bival.Finish(&arival).ok()); - UNIT_ASSERT(Bdec.Finish(&ardec).ok()); + UNIT_ASSERT(Bjd.Finish(&arjd).ok()); + //UNIT_ASSERT(Bdec.Finish(&ardec).ok()); std::shared_ptr<arrow::Schema> schema = TDataRow::MakeArrowSchema(); return arrow::Table::Make(schema, { @@ -308,8 +329,8 @@ public: aru8, aru16, aru32, aru64, arf32, arf64, arstr, arutf, arj, ary, - ard, ardt, arts, arival, - ardec + ard, ardt, arts, arival, arjd + //ardec }); } @@ -341,7 +362,8 @@ private: arrow::UInt32Builder Bdt; arrow::TimestampBuilder Bts; arrow::DurationBuilder Bival; - arrow::Decimal128Builder Bdec; + arrow::BinaryBuilder Bjd; + //arrow::Decimal128Builder Bdec; }; std::shared_ptr<arrow::RecordBatch> VectorToBatch(const std::vector<struct TDataRow>& rows) { @@ -360,10 +382,10 @@ std::shared_ptr<arrow::RecordBatch> VectorToBatch(const std::vector<struct TData std::vector<TDataRow> TestRows() { std::vector<TDataRow> rows = { - {false, -1, -1, -1, -1, 1, 1, 1, 1, -1.0f, -1.0, "s1", "u1", "{j:1}", "{y:1}", 0, 0, 0, 0, {0,0} }, - {false, 2, 2, 2, 2, 2, 2, 2, 2, 2.0f, 2.0, "s2", "u2", "{j:2}", "{y:2}", 0, 0, 0, 0, {0,0} }, - {false, -3, -3, -3, -3, 3, 3, 3, 3, -3.0f, -3.0, "s3", "u3", "{j:3}", "{y:3}", 0, 0, 0, 0, {0,0} }, - {false, -4, -4, -4, -4, 4, 4, 4, 4, 4.0f, 4.0, "s4", "u4", "{j:4}", "{y:4}", 0, 0, 0, 0, {0,0} }, + {false, -1, -1, -1, -1, 1, 1, 1, 1, -1.0f, -1.0, "s1", "u1", "{\"j\":1}", "{y:1}", 0, 0, 0, 0, "{\"jd\":1}" }, + {false, 2, 2, 2, 2, 2, 2, 2, 2, 2.0f, 2.0, "s2", "u2", "{\"j\":2}", "{y:2}", 0, 0, 0, 0, "{\"jd\":1}" }, + {false, -3, -3, -3, -3, 3, 3, 3, 3, -3.0f, -3.0, "s3", "u3", "{\"j\":3}", "{y:3}", 0, 0, 0, 0, "{\"jd\":1}" }, + {false, -4, -4, -4, -4, 4, 4, 4, 4, 4.0f, 4.0, "s4", "u4", "{\"j\":4}", "{y:4}", 0, 0, 0, 0, "{\"jd\":1}" }, }; return rows; } @@ -547,8 +569,7 @@ Y_UNIT_TEST_SUITE(ArrowTest) { TVector<TOwnedCellVec> cellRows; for (const TDataRow& row : rows) { - NKikimr::TDbTupleRef value = row.ToDbTupleRef(); - cellRows.push_back(TOwnedCellVec(value.Cells())); + cellRows.push_back(TOwnedCellVec(row.SerializedCells())); } std::shared_ptr<arrow::RecordBatch> batch = VectorToBatch(rows); @@ -569,6 +590,7 @@ Y_UNIT_TEST_SUITE(ArrowTest) { NArrow::TArrowToYdbConverter toYdbConverter(TDataRow::MakeYdbSchema(), rowWriter); TString errStr; bool ok = toYdbConverter.Process(*batch, errStr); + Cerr << "Process: " << errStr << "\n"; UNIT_ASSERT(ok); UNIT_ASSERT_VALUES_EQUAL(cellRows.size(), rowWriter.Rows.size()); diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.txt b/ydb/core/tx/columnshard/engines/CMakeLists.txt index b38105c2716..10fd75a948d 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.txt @@ -21,6 +21,8 @@ target_link_libraries(tx-columnshard-engines PUBLIC ydb-core-scheme ydb-core-tablet ydb-core-tablet_flat + udf-service-exception_policy + yql-sql-pg_dummy ) target_sources(tx-columnshard-engines PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_engine_logs.cpp diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h index 07a84594999..c05c11e6e28 100644 --- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h +++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h @@ -148,6 +148,7 @@ protected: TVector<std::pair<TString, NScheme::TTypeId>> SrcColumns; // source columns in CSV could have any order TVector<std::pair<TString, NScheme::TTypeId>> YdbSchema; THashMap<ui32, size_t> Id2Position; // columnId -> its position in YdbSchema + THashMap<TString, NScheme::TTypeId> ColumnsToConvert; bool WriteToTableShadow = false; bool AllowWriteToPrivateTable = false; @@ -405,6 +406,12 @@ private: Id2Position[columnId] = position; YdbSchema[position] = std::make_pair(ValueColumnNames[i], ValueColumnPositions[i].Type); } + + for (const auto& [colName, colType] : YdbSchema) { + if (NArrow::TArrowToYdbConverter::NeedDataConversion(colType)) { + ColumnsToConvert[colName] = colType; + } + } } if (!keyColumnsLeft.empty()) { @@ -497,8 +504,11 @@ private: return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, errorMessage, ctx); } - if (isOlapTable && !ExtractBatch(errorMessage)) { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, errorMessage, ctx); + if (isOlapTable) { + // TUploadRowsRPCPublic::ExtractBatch() - converted JsonDocument, DynNumbers, ... + if (!ExtractBatch(errorMessage)) { + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, errorMessage, ctx); + } } else { FindMinMaxKeys(); } @@ -507,10 +517,25 @@ private: case EUploadSource::ArrowBatch: case EUploadSource::CSV: { - if (!ExtractBatch(errorMessage)) { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, errorMessage, ctx); - } - if (!isOlapTable) { + if (isOlapTable) { + // TUploadColumnsRPCPublic::ExtractBatch() - NOT converted JsonDocument, DynNumbers, ... + if (!ExtractBatch(errorMessage)) { + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, errorMessage, ctx); + } + // Explicit types conversion + if (!ColumnsToConvert.empty()) { + Batch = NArrow::ConvertColumns(Batch, ColumnsToConvert); + if (!Batch) { + errorMessage = "Cannot upsert arrow batch: one of data types has no conversion"; + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, errorMessage, ctx); + } + } + } else { + // TUploadColumnsRPCPublic::ExtractBatch() - NOT converted JsonDocument, DynNumbers, ... + if (!ExtractBatch(errorMessage)) { + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, errorMessage, ctx); + } + // Implicit types conversion inside ExtractRows(), in TArrowToYdbConverter if (!ExtractRows(errorMessage)) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, errorMessage, ctx); } diff --git a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp index b668857c30c..4bb0e9db428 100644 --- a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp +++ b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp @@ -11,18 +11,62 @@ using namespace NYdb; namespace { -// TODO: there's no way to read all data via LongTx Read. It returns some part of result. -TString Read(TDriver& connection, const TString& tablePath) { - NYdb::NLongTx::TClient client(connection); - - NLongTx::TLongTxBeginResult resBeginTx = client.BeginReadTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); - - auto txId = resBeginTx.GetResult().tx_id(); +ui32 ScanQuerySelect(NYdb::NTable::TTableClient client, const TString& tablePath, + const std::vector<std::pair<TString, NYdb::EPrimitiveType>>& ydbSchema = TTestOlap::PublicSchema()) { + auto query = Sprintf("SELECT * FROM `%s`", tablePath.c_str()); + + // Executes scan query + auto result = client.StreamExecuteScanQuery(query).GetValueSync(); + if (!result.IsSuccess()) { + Cerr << "ScanQuery execution failure: " << result.GetIssues().ToString() << Endl; + return 0; + } - NLongTx::TLongTxReadResult resRead = client.Read(txId, tablePath).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::SUCCESS); - return resRead.GetResult().data().data(); + ui32 numRows = 0; + bool eos = false; + Cout << "ScanQuery:" << Endl; + while (!eos) { + auto streamPart = result.ReadNext().ExtractValueSync(); + if (!streamPart.IsSuccess()) { + eos = true; + if (!streamPart.EOS()) { + Cerr << "ScanQuery execution failure: " << streamPart.GetIssues().ToString() << Endl; + return 0; + } + continue; + } + + if (streamPart.HasResultSet()) { + auto rs = streamPart.ExtractResultSet(); + auto columns = rs.GetColumnsMeta(); + + TResultSetParser parser(rs); + while (parser.TryNextRow()) { + for (auto& [colName, colType] : ydbSchema) { + switch (colType) { + case NYdb::EPrimitiveType::Timestamp: + Cout << parser.ColumnParser(colName).GetOptionalTimestamp() << ", "; + break; + case NYdb::EPrimitiveType::Utf8: + Cout << parser.ColumnParser(colName).GetOptionalUtf8() << ", "; + break; + case NYdb::EPrimitiveType::Int32: + Cout << parser.ColumnParser(colName).GetOptionalInt32() << ", "; + break; + case NYdb::EPrimitiveType::JsonDocument: + Cout << parser.ColumnParser(colName).GetOptionalJsonDocument() << ", "; + break; + default: + Cout << "<other>, "; + break; + } + } + Cout << Endl; + ++numRows; + } + } + } + return numRows; } } @@ -60,13 +104,9 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) { } Cerr << "Upsert done: " << TInstant::Now() - start << Endl; - { // read with long tx read - TString readRes = Read(connection, tablePath); - UNIT_ASSERT(!readRes.empty()); - - auto batch = NArrow::DeserializeBatch(readRes, schema); - UNIT_ASSERT(batch); - UNIT_ASSERT(batch->num_rows() > 0); + { // Read all + ui32 numRows = ScanQuerySelect(client, tablePath); + UNIT_ASSERT_GT(numRows, 0); } // Negatives @@ -151,13 +191,9 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) { } Cerr << "Upsert done: " << TInstant::Now() - start << Endl; - { // read with long tx read - TString readRes = Read(connection, tablePath); - UNIT_ASSERT(!readRes.empty()); - - auto batch = NArrow::DeserializeBatch(readRes, schema); - UNIT_ASSERT(batch); - UNIT_ASSERT(batch->num_rows() > 0); + { // Read all + ui32 numRows = ScanQuerySelect(client, tablePath); + UNIT_ASSERT_GT(numRows, 0); } // Negatives @@ -230,11 +266,11 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) { } { // Big CSV batch - auto bigBatch = TTestOlap::SampleBatch(true, 150000); + auto bigBatch = TTestOlap::SampleBatch(true, 130000); ui32 batchSize = NArrow::GetBatchDataSize(bigBatch); Cerr << "rows: " << bigBatch->num_rows() << " batch size: " << batchSize << Endl; UNIT_ASSERT(batchSize > 15 * 1024 * 1024); - UNIT_ASSERT(batchSize < 20 * 1024 * 1024); + UNIT_ASSERT(batchSize < 17 * 1024 * 1024); TString bigCsv = TTestOlap::ToCSV(bigBatch); @@ -246,11 +282,11 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) { } { // Too big CSV batch - auto bigBatch = TTestOlap::SampleBatch(true, 200000); // 2 shards, greater then 8 Mb per shard + auto bigBatch = TTestOlap::SampleBatch(true, 150000); // 2 shards, greater then 8 Mb per shard ui32 batchSize = NArrow::GetBatchDataSize(bigBatch); Cerr << "rows: " << bigBatch->num_rows() << " batch size: " << batchSize << Endl; - UNIT_ASSERT(batchSize > 20 * 1024 * 1024); - UNIT_ASSERT(batchSize < 30 * 1024 * 1024); + UNIT_ASSERT(batchSize > 16 * 1024 * 1024); + UNIT_ASSERT(batchSize < 20 * 1024 * 1024); TString bigCsv = TTestOlap::ToCSV(bigBatch); @@ -304,6 +340,11 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) { } Cerr << "Upsert done: " << TInstant::Now() - start << Endl; + { // Read all + ui32 numRows = ScanQuerySelect(client, tablePath); + UNIT_ASSERT_GT(numRows, 0); + } + // Read auto res = session.ExecuteDataQuery("SELECT count(*) AS _cnt FROM [/Root/LogsX];", NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); diff --git a/ydb/services/ydb/ydb_common_ut.h b/ydb/services/ydb/ydb_common_ut.h index c2966ba7440..a3a9bbc1bcd 100644 --- a/ydb/services/ydb/ydb_common_ut.h +++ b/ydb/services/ydb/ydb_common_ut.h @@ -189,7 +189,7 @@ struct TTestOlap { { "uid", NYdb::EPrimitiveType::Utf8 }, { "level", NYdb::EPrimitiveType::Int32 }, { "message", NYdb::EPrimitiveType::Utf8 }, - { "json_payload", NYdb::EPrimitiveType::String }, + { "json_payload", NYdb::EPrimitiveType::JsonDocument }, { "ingested_at", NYdb::EPrimitiveType::Timestamp }, { "saved_at", NYdb::EPrimitiveType::Timestamp }, { "request_id", NYdb::EPrimitiveType::Utf8 } @@ -209,7 +209,6 @@ struct TTestOlap { const TString& storeName = StoreName, const TString& tableName = TableName) { TString tableDescr = Sprintf(R"( Name: "%s" - #MetaShardCount: 1 ColumnShardCount: 4 SchemaPresets { Name: "default" @@ -220,7 +219,7 @@ struct TTestOlap { Columns { Name: "uid" Type: "Utf8" } Columns { Name: "level" Type: "Int32" } Columns { Name: "message" Type: "Utf8" } - Columns { Name: "json_payload" Type: "Json" } + Columns { Name: "json_payload" Type: "JsonDocument" } Columns { Name: "ingested_at" Type: "Timestamp" } Columns { Name: "saved_at" Type: "Timestamp" } Columns { Name: "request_id" Type: "Utf8" } @@ -279,7 +278,7 @@ struct TTestOlap { Y_VERIFY(NArrow::Append<arrow::StringType>(*builders[3], s)); Y_VERIFY(NArrow::Append<arrow::Int32Type>(*builders[4], i)); Y_VERIFY(NArrow::Append<arrow::StringType>(*builders[5], s + "str")); - Y_VERIFY(NArrow::Append<arrow::BinaryType>(*builders[6], s + "bin")); + Y_VERIFY(NArrow::Append<arrow::BinaryType>(*builders[6], "{ \"value\": " + s + " }")); Y_VERIFY(NArrow::Append<arrow::StringType>(*builders[9], s + "str")); } |