summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorArtem Zuikov <[email protected]>2022-06-09 14:39:36 +0300
committerArtem Zuikov <[email protected]>2022-06-09 14:39:36 +0300
commit0e7645a1547e5ac0e618721fa207c35ee444c247 (patch)
treeea6ac050f3da3cacf18135fb62f4a7c452e25f13
parentc04b663c7bb4b750deeb8f48f620497ec13da8fa (diff)
KIKIMR-15046: fix upsert JsonDocument via CSV or Arrow
ref:1b132310111e00aeeae70bb01445128b1e515d43
-rw-r--r--CMakeLists.darwin.txt2
-rw-r--r--CMakeLists.linux.txt2
-rw-r--r--ydb/core/formats/CMakeLists.txt5
-rw-r--r--ydb/core/formats/arrow_helpers.cpp120
-rw-r--r--ydb/core/formats/arrow_helpers.h15
-rw-r--r--ydb/core/formats/ut/CMakeLists.darwin.txt3
-rw-r--r--ydb/core/formats/ut/CMakeLists.linux.txt3
-rw-r--r--ydb/core/formats/ut_arrow.cpp72
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.txt2
-rw-r--r--ydb/core/tx/tx_proxy/upload_rows_common_impl.h37
-rw-r--r--ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp101
-rw-r--r--ydb/services/ydb/ydb_common_ut.h7
12 files changed, 300 insertions, 69 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt
index a98538a6d61..1b2cf3f02c1 100644
--- a/CMakeLists.darwin.txt
+++ b/CMakeLists.darwin.txt
@@ -789,6 +789,7 @@ add_subdirectory(library/cpp/containers/ring_buffer)
add_subdirectory(ydb/core/security)
add_subdirectory(ydb/core/tx/columnshard)
add_subdirectory(ydb/core/tx/columnshard/engines)
+add_subdirectory(ydb/library/yql/public/udf/service/exception_policy)
add_subdirectory(ydb/core/tx/coordinator)
add_subdirectory(ydb/core/tx/long_tx_service)
add_subdirectory(ydb/core/tx/mediator)
@@ -827,7 +828,6 @@ add_subdirectory(ydb/library/yql/providers/solomon/expr_nodes)
add_subdirectory(ydb/library/yql/providers/ydb/actors)
add_subdirectory(ydb/library/yql/providers/ydb/comp_nodes)
add_subdirectory(ydb/core/yq/libs/logs)
-add_subdirectory(ydb/library/yql/public/udf/service/exception_policy)
add_subdirectory(ydb/services/auth)
add_subdirectory(ydb/services/cms)
add_subdirectory(ydb/services/discovery)
diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt
index a9e3cd5b372..7ad724d2a11 100644
--- a/CMakeLists.linux.txt
+++ b/CMakeLists.linux.txt
@@ -869,6 +869,7 @@ add_subdirectory(library/cpp/containers/ring_buffer)
add_subdirectory(ydb/core/security)
add_subdirectory(ydb/core/tx/columnshard)
add_subdirectory(ydb/core/tx/columnshard/engines)
+add_subdirectory(ydb/library/yql/public/udf/service/exception_policy)
add_subdirectory(ydb/core/tx/coordinator)
add_subdirectory(ydb/core/tx/long_tx_service)
add_subdirectory(ydb/core/tx/mediator)
@@ -925,7 +926,6 @@ add_subdirectory(ydb/library/yql/providers/solomon/expr_nodes)
add_subdirectory(ydb/library/yql/providers/ydb/actors)
add_subdirectory(ydb/library/yql/providers/ydb/comp_nodes)
add_subdirectory(ydb/core/yq/libs/logs)
-add_subdirectory(ydb/library/yql/public/udf/service/exception_policy)
add_subdirectory(ydb/services/auth)
add_subdirectory(ydb/services/cms)
add_subdirectory(ydb/services/discovery)
diff --git a/ydb/core/formats/CMakeLists.txt b/ydb/core/formats/CMakeLists.txt
index f61a7c54d19..993180c9e4f 100644
--- a/ydb/core/formats/CMakeLists.txt
+++ b/ydb/core/formats/CMakeLists.txt
@@ -8,11 +8,16 @@
add_library(ydb-core-formats)
+target_compile_options(ydb-core-formats PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
target_link_libraries(ydb-core-formats PUBLIC
contrib-libs-cxxsupp
yutil
libs-apache-arrow
ydb-core-scheme
+ ydb-library-binary_json
+ ydb-library-dynumber
)
target_sources(ydb-core-formats PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow_batch_builder.cpp
diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp
index 3e1e1b0444a..72ff6fd234e 100644
--- a/ydb/core/formats/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow_helpers.cpp
@@ -2,6 +2,10 @@
#include "switch_type.h"
#include "one_batch_input_stream.h"
#include "merging_sorted_input_stream.h"
+
+#include <ydb/library/binary_json/write.h>
+#include <ydb/library/dynumber/dynumber.h>
+#include <util/memory/pool.h>
#include <util/system/yassert.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/io/memory.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/reader.h>
@@ -912,6 +916,99 @@ std::shared_ptr<arrow::RecordBatch> SortBatch(const std::shared_ptr<arrow::Recor
return Reorder(batch, sortPermutation);
}
+static bool ConvertData(TCell& cell, const NScheme::TTypeId& colType, TMemoryPool& memPool, TString& errorMessage) {
+ switch (colType) {
+ case NScheme::NTypeIds::DyNumber: {
+ const auto dyNumber = NDyNumber::ParseDyNumberString(cell.AsBuf());
+ if (!dyNumber.Defined()) {
+ errorMessage = "Invalid DyNumber string representation";
+ return false;
+ }
+ const auto dyNumberInPool = memPool.AppendString(TStringBuf(*dyNumber));
+ cell = TCell(dyNumberInPool.data(), dyNumberInPool.size());
+ break;
+ }
+ case NScheme::NTypeIds::JsonDocument: {
+ const auto binaryJson = NBinaryJson::SerializeToBinaryJson(cell.AsBuf());
+ if (!binaryJson.Defined()) {
+ errorMessage = "Invalid JSON for JsonDocument provided";
+ return false;
+ }
+ const auto saved = memPool.AppendString(TStringBuf(binaryJson->Data(), binaryJson->Size()));
+ cell = TCell(saved.data(), saved.size());
+ break;
+ }
+ case NScheme::NTypeIds::Decimal:
+ errorMessage = "Decimal conversion is not supported yet";
+ return false;
+ default:
+ break;
+ }
+ return true;
+}
+
+static std::shared_ptr<arrow::Array> ConvertColumn(const std::shared_ptr<arrow::Array>& column,
+ NScheme::TTypeId colType) {
+ if (colType == NScheme::NTypeIds::Decimal) {
+ return {};
+ }
+
+ if (column->type()->id() != arrow::Type::BINARY) {
+ return {};
+ }
+
+ auto& binaryArray = static_cast<arrow::BinaryArray&>(*column);
+ arrow::BinaryBuilder builder;
+ builder.Reserve(binaryArray.length()).ok();
+ // TODO: ReserveData
+
+ switch (colType) {
+ case NScheme::NTypeIds::DyNumber: {
+ for (i32 i = 0; i < binaryArray.length(); ++i) {
+ auto value = binaryArray.Value(i);
+ const auto dyNumber = NDyNumber::ParseDyNumberString(TStringBuf(value.data(), value.size()));
+ if (!dyNumber.Defined() || !builder.Append((*dyNumber).data(), (*dyNumber).size()).ok()) {
+ return {};
+ }
+ }
+ }
+ case NScheme::NTypeIds::JsonDocument: {
+ for (i32 i = 0; i < binaryArray.length(); ++i) {
+ auto value = binaryArray.Value(i);
+ const auto binaryJson = NBinaryJson::SerializeToBinaryJson(TStringBuf(value.data(), value.size()));
+ if (!binaryJson.Defined() || !builder.Append(binaryJson->Data(), binaryJson->Size()).ok()) {
+ return {};
+ }
+ }
+ }
+ default:
+ break;
+ }
+
+ std::shared_ptr<arrow::BinaryArray> result;
+ if (!builder.Finish(&result).ok()) {
+ return {};
+ }
+ return result;
+}
+
+std::shared_ptr<arrow::RecordBatch> ConvertColumns(const std::shared_ptr<arrow::RecordBatch>& batch,
+ const THashMap<TString, NScheme::TTypeId>& columnsToConvert)
+{
+ std::vector<std::shared_ptr<arrow::Array>> columns = batch->columns();
+ for (i32 i = 0; i < batch->num_columns(); ++i) {
+ auto& colName = batch->column_name(i);
+ auto it = columnsToConvert.find(TString(colName.data(), colName.size()));
+ if (it != columnsToConvert.end()) {
+ columns[i] = ConvertColumn(columns[i], it->second);
+ if (!columns[i]) {
+ return {};
+ }
+ }
+ }
+ return arrow::RecordBatch::Make(batch->schema(), batch->num_rows(), columns);
+}
+
bool TArrowToYdbConverter::Process(const arrow::RecordBatch& batch, TString& errorMessage) {
std::vector<std::shared_ptr<arrow::Array>> allColumns;
allColumns.reserve(YdbSchema.size());
@@ -929,6 +1026,8 @@ bool TArrowToYdbConverter::Process(const arrow::RecordBatch& batch, TString& err
std::vector<TSmallVec<TCell>> cells;
i64 row = 0;
+ TMemoryPool memPool(256); // for convertions
+
#if 1 // optimization
static constexpr i32 unroll = 32;
cells.reserve(unroll);
@@ -959,6 +1058,16 @@ bool TArrowToYdbConverter::Process(const arrow::RecordBatch& batch, TString& err
<< " at column '" << colName << "'";
return false;
}
+
+ if (NeedDataConversion(colType)) {
+ memPool.Clear();
+ for (i32 i = 0; i < unroll; ++i) {
+ if (!ConvertData(cells[i][col], colType, memPool, errorMessage)) {
+ return false;
+ }
+ }
+ }
+
++col;
}
@@ -973,17 +1082,20 @@ bool TArrowToYdbConverter::Process(const arrow::RecordBatch& batch, TString& err
#endif
for (; row < batch.num_rows(); ++row) {
+ memPool.Clear();
+
ui32 col = 0;
for (auto& [colName, colType] : YdbSchema) {
auto& column = allColumns[col];
+ auto& curCell = cells[0][col];
if (column->IsNull(row)) {
- cells[0][col] = TCell();
+ curCell = TCell();
continue;
}
bool success = SwitchYqlTypeToArrowType(colType, [&]<typename TType>(TTypeWrapper<TType> typeHolder) {
Y_UNUSED(typeHolder);
- cells[0][col] = MakeCell<typename arrow::TypeTraits<TType>::ArrayType>(column, row);
+ curCell = MakeCell<typename arrow::TypeTraits<TType>::ArrayType>(column, row);
return true;
});
@@ -992,6 +1104,10 @@ bool TArrowToYdbConverter::Process(const arrow::RecordBatch& batch, TString& err
<< " at column '" << colName << "'";
return false;
}
+
+ if (!ConvertData(curCell, colType, memPool, errorMessage)) {
+ return false;
+ }
++col;
}
diff --git a/ydb/core/formats/arrow_helpers.h b/ydb/core/formats/arrow_helpers.h
index cd3ec9f865f..19d15052700 100644
--- a/ydb/core/formats/arrow_helpers.h
+++ b/ydb/core/formats/arrow_helpers.h
@@ -181,6 +181,18 @@ private:
}
public:
+ static bool NeedDataConversion(const NScheme::TTypeId& colType) {
+ switch (colType) {
+ case NScheme::NTypeIds::DyNumber:
+ case NScheme::NTypeIds::JsonDocument:
+ case NScheme::NTypeIds::Decimal:
+ return true;
+ default:
+ break;
+ }
+ return false;
+ }
+
TArrowToYdbConverter(const TVector<std::pair<TString, NScheme::TTypeId>>& ydbSchema, IRowWriter& rowWriter)
: YdbSchema(ydbSchema)
, RowWriter(rowWriter)
@@ -198,6 +210,9 @@ public:
virtual void AddRow(const TConstArrayRef<TCell> &cells) = 0;
};
+std::shared_ptr<arrow::RecordBatch> ConvertColumns(const std::shared_ptr<arrow::RecordBatch>& batch,
+ const THashMap<TString, NScheme::TTypeId>& columnsToConvert);
+
inline bool HasNulls(const std::shared_ptr<arrow::Array>& column) {
return column->null_bitmap_data();
}
diff --git a/ydb/core/formats/ut/CMakeLists.darwin.txt b/ydb/core/formats/ut/CMakeLists.darwin.txt
index 63729f5691e..2f19d1653bc 100644
--- a/ydb/core/formats/ut/CMakeLists.darwin.txt
+++ b/ydb/core/formats/ut/CMakeLists.darwin.txt
@@ -10,6 +10,7 @@
add_executable(ydb-core-formats-ut)
target_compile_options(ydb-core-formats-ut PRIVATE
-Wno-unused-parameter
+ -DUSE_CURRENT_UDF_ABI_VERSION
)
target_include_directories(ydb-core-formats-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/formats
@@ -22,6 +23,8 @@ target_link_libraries(ydb-core-formats-ut PUBLIC
ydb-core-formats
libs-apache-arrow
ydb-core-base
+ udf-service-exception_policy
+ yql-sql-pg_dummy
)
target_link_options(ydb-core-formats-ut PRIVATE
-Wl,-no_deduplicate
diff --git a/ydb/core/formats/ut/CMakeLists.linux.txt b/ydb/core/formats/ut/CMakeLists.linux.txt
index 286bb5eb02c..38c63da7034 100644
--- a/ydb/core/formats/ut/CMakeLists.linux.txt
+++ b/ydb/core/formats/ut/CMakeLists.linux.txt
@@ -10,6 +10,7 @@
add_executable(ydb-core-formats-ut)
target_compile_options(ydb-core-formats-ut PRIVATE
-Wno-unused-parameter
+ -DUSE_CURRENT_UDF_ABI_VERSION
)
target_include_directories(ydb-core-formats-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/formats
@@ -23,6 +24,8 @@ target_link_libraries(ydb-core-formats-ut PUBLIC
ydb-core-formats
libs-apache-arrow
ydb-core-base
+ udf-service-exception_policy
+ yql-sql-pg_dummy
)
target_link_options(ydb-core-formats-ut PRIVATE
-ldl
diff --git a/ydb/core/formats/ut_arrow.cpp b/ydb/core/formats/ut_arrow.cpp
index 344bbe7cb8e..291300bdd31 100644
--- a/ydb/core/formats/ut_arrow.cpp
+++ b/ydb/core/formats/ut_arrow.cpp
@@ -2,6 +2,7 @@
#include <ydb/core/formats/arrow_helpers.h>
#include <ydb/core/formats/one_batch_input_stream.h>
#include <ydb/core/formats/merging_sorted_input_stream.h>
+#include <ydb/library/binary_json/write.h>
#include <library/cpp/testing/unittest/registar.h>
#include <util/string/printf.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/api.h>
@@ -35,8 +36,8 @@ struct TDataRow {
NTypeIds::Datetime,
NTypeIds::Timestamp,
NTypeIds::Interval,
- NTypeIds::Decimal
- // NTypeIds::PairUi64Ui64, NTypeIds::ActorId, NTypeIds::StepOrderId
+ NTypeIds::JsonDocument,
+ // TODO: DyNumber, Decimal
};
bool Bool;
@@ -58,7 +59,8 @@ struct TDataRow {
ui32 Datetime;
i64 Timestamp;
i64 Interval;
- ui64 Decimal[2];
+ std::string JsonDocument;
+ //ui64 Decimal[2];
bool operator == (const TDataRow& r) const {
return (Bool == r.Bool) &&
@@ -80,7 +82,8 @@ struct TDataRow {
(Datetime == r.Datetime) &&
(Timestamp == r.Timestamp) &&
(Interval == r.Interval) &&
- (Decimal[0] == r.Decimal[0] && Decimal[1] == r.Decimal[1]);
+ (JsonDocument == r.JsonDocument);
+ //(Decimal[0] == r.Decimal[0] && Decimal[1] == r.Decimal[1]);
}
static std::shared_ptr<arrow::Schema> MakeArrowSchema() {
@@ -104,7 +107,8 @@ struct TDataRow {
arrow::field("datetime", arrow::uint32()),
arrow::field("ts", arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO)),
arrow::field("ival", arrow::duration(arrow::TimeUnit::TimeUnit::MICRO)),
- arrow::field("dec", arrow::decimal(NScheme::DECIMAL_PRECISION, NScheme::DECIMAL_SCALE)),
+ arrow::field("json_doc", arrow::binary()),
+ //arrow::field("dec", arrow::decimal(NScheme::DECIMAL_PRECISION, NScheme::DECIMAL_SCALE)),
};
return std::make_shared<arrow::Schema>(fields);
@@ -131,7 +135,8 @@ struct TDataRow {
{"datetime", NTypeIds::Datetime },
{"ts", NTypeIds::Timestamp },
{"ival", NTypeIds::Interval },
- {"dec", NTypeIds::Decimal }
+ {"json_doc", NTypeIds::JsonDocument },
+ //{"dec", NTypeIds::Decimal }
};
return columns;
}
@@ -157,10 +162,22 @@ struct TDataRow {
Cells[16] = TCell::Make<ui32>(Datetime);
Cells[17] = TCell::Make<i64>(Timestamp);
Cells[18] = TCell::Make<i64>(Interval);
- Cells[19] = TCell((const char *)&Decimal[0], 16);
+ Cells[19] = TCell(JsonDocument.data(), JsonDocument.size());
+ //Cells[19] = TCell((const char *)&Decimal[0], 16);
return NKikimr::TDbTupleRef(Types, Cells, 20);
}
+
+ TOwnedCellVec SerializedCells() const {
+ NKikimr::TDbTupleRef value = ToDbTupleRef();
+ TVector<TCell> cells(value.Cells().data(), value.Cells().data() + value.Cells().size());
+
+ auto binaryJson = NBinaryJson::SerializeToBinaryJson(TStringBuf(JsonDocument.data(), JsonDocument.size()));
+ UNIT_ASSERT(binaryJson.Defined());
+
+ cells[19] = TCell(binaryJson->Data(), binaryJson->Size());
+ return TOwnedCellVec(cells);
+ }
};
@@ -198,18 +215,19 @@ std::vector<TDataRow> ToVector(const std::shared_ptr<T>& table) {
auto arts = std::static_pointer_cast<arrow::TimestampArray>(GetColumn(*table, 17));
auto arival = std::static_pointer_cast<arrow::DurationArray>(GetColumn(*table, 18));
- auto ardec = std::static_pointer_cast<arrow::Decimal128Array>(GetColumn(*table, 19));
+ auto arjd = std::static_pointer_cast<arrow::BinaryArray>(GetColumn(*table, 19));
+ //auto ardec = std::static_pointer_cast<arrow::Decimal128Array>(GetColumn(*table, 19));
for (int64_t i = 0; i < table->num_rows(); ++i) {
- ui64 dec[2];
- memcpy(dec, ardec->Value(i), 16);
+ //ui64 dec[2];
+ //memcpy(dec, ardec->Value(i), 16);
TDataRow r{ arbool->Value(i),
ari8->Value(i), ari16->Value(i), ari32->Value(i), ari64->Value(i),
aru8->Value(i), aru16->Value(i), aru32->Value(i), aru64->Value(i),
arf32->Value(i), arf64->Value(i),
arstr->GetString(i), arutf->GetString(i), arj->GetString(i), ary->GetString(i),
- ard->Value(i), ardt->Value(i), arts->Value(i), arival->Value(i),
- {dec[0], dec[1]}
+ ard->Value(i), ardt->Value(i), arts->Value(i), arival->Value(i), arjd->GetString(i)
+ //{dec[0], dec[1]}
};
rows.emplace_back(std::move(r));
}
@@ -223,7 +241,7 @@ public:
TDataRowTableBuilder()
: Bts(arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO), arrow::default_memory_pool())
, Bival(arrow::duration(arrow::TimeUnit::TimeUnit::MICRO), arrow::default_memory_pool())
- , Bdec(arrow::decimal(NScheme::DECIMAL_PRECISION, NScheme::DECIMAL_SCALE), arrow::default_memory_pool())
+ //, Bdec(arrow::decimal(NScheme::DECIMAL_PRECISION, NScheme::DECIMAL_SCALE), arrow::default_memory_pool())
{}
void AddRow(const TDataRow& row) {
@@ -249,7 +267,8 @@ public:
UNIT_ASSERT(Bts.Append(row.Timestamp).ok());
UNIT_ASSERT(Bival.Append(row.Interval).ok());
- UNIT_ASSERT(Bdec.Append((const char *)&row.Decimal).ok());
+ UNIT_ASSERT(Bjd.Append(row.JsonDocument).ok());
+ //UNIT_ASSERT(Bdec.Append((const char *)&row.Decimal).ok());
}
std::shared_ptr<arrow::Table> Finish() {
@@ -275,7 +294,8 @@ public:
std::shared_ptr<arrow::TimestampArray> arts;
std::shared_ptr<arrow::DurationArray> arival;
- std::shared_ptr<arrow::Decimal128Array> ardec;
+ std::shared_ptr<arrow::BinaryArray> arjd;
+ //std::shared_ptr<arrow::Decimal128Array> ardec;
UNIT_ASSERT(Bbool.Finish(&arbool).ok());
UNIT_ASSERT(Bi8.Finish(&ari8).ok());
@@ -299,7 +319,8 @@ public:
UNIT_ASSERT(Bts.Finish(&arts).ok());
UNIT_ASSERT(Bival.Finish(&arival).ok());
- UNIT_ASSERT(Bdec.Finish(&ardec).ok());
+ UNIT_ASSERT(Bjd.Finish(&arjd).ok());
+ //UNIT_ASSERT(Bdec.Finish(&ardec).ok());
std::shared_ptr<arrow::Schema> schema = TDataRow::MakeArrowSchema();
return arrow::Table::Make(schema, {
@@ -308,8 +329,8 @@ public:
aru8, aru16, aru32, aru64,
arf32, arf64,
arstr, arutf, arj, ary,
- ard, ardt, arts, arival,
- ardec
+ ard, ardt, arts, arival, arjd
+ //ardec
});
}
@@ -341,7 +362,8 @@ private:
arrow::UInt32Builder Bdt;
arrow::TimestampBuilder Bts;
arrow::DurationBuilder Bival;
- arrow::Decimal128Builder Bdec;
+ arrow::BinaryBuilder Bjd;
+ //arrow::Decimal128Builder Bdec;
};
std::shared_ptr<arrow::RecordBatch> VectorToBatch(const std::vector<struct TDataRow>& rows) {
@@ -360,10 +382,10 @@ std::shared_ptr<arrow::RecordBatch> VectorToBatch(const std::vector<struct TData
std::vector<TDataRow> TestRows() {
std::vector<TDataRow> rows = {
- {false, -1, -1, -1, -1, 1, 1, 1, 1, -1.0f, -1.0, "s1", "u1", "{j:1}", "{y:1}", 0, 0, 0, 0, {0,0} },
- {false, 2, 2, 2, 2, 2, 2, 2, 2, 2.0f, 2.0, "s2", "u2", "{j:2}", "{y:2}", 0, 0, 0, 0, {0,0} },
- {false, -3, -3, -3, -3, 3, 3, 3, 3, -3.0f, -3.0, "s3", "u3", "{j:3}", "{y:3}", 0, 0, 0, 0, {0,0} },
- {false, -4, -4, -4, -4, 4, 4, 4, 4, 4.0f, 4.0, "s4", "u4", "{j:4}", "{y:4}", 0, 0, 0, 0, {0,0} },
+ {false, -1, -1, -1, -1, 1, 1, 1, 1, -1.0f, -1.0, "s1", "u1", "{\"j\":1}", "{y:1}", 0, 0, 0, 0, "{\"jd\":1}" },
+ {false, 2, 2, 2, 2, 2, 2, 2, 2, 2.0f, 2.0, "s2", "u2", "{\"j\":2}", "{y:2}", 0, 0, 0, 0, "{\"jd\":1}" },
+ {false, -3, -3, -3, -3, 3, 3, 3, 3, -3.0f, -3.0, "s3", "u3", "{\"j\":3}", "{y:3}", 0, 0, 0, 0, "{\"jd\":1}" },
+ {false, -4, -4, -4, -4, 4, 4, 4, 4, 4.0f, 4.0, "s4", "u4", "{\"j\":4}", "{y:4}", 0, 0, 0, 0, "{\"jd\":1}" },
};
return rows;
}
@@ -547,8 +569,7 @@ Y_UNIT_TEST_SUITE(ArrowTest) {
TVector<TOwnedCellVec> cellRows;
for (const TDataRow& row : rows) {
- NKikimr::TDbTupleRef value = row.ToDbTupleRef();
- cellRows.push_back(TOwnedCellVec(value.Cells()));
+ cellRows.push_back(TOwnedCellVec(row.SerializedCells()));
}
std::shared_ptr<arrow::RecordBatch> batch = VectorToBatch(rows);
@@ -569,6 +590,7 @@ Y_UNIT_TEST_SUITE(ArrowTest) {
NArrow::TArrowToYdbConverter toYdbConverter(TDataRow::MakeYdbSchema(), rowWriter);
TString errStr;
bool ok = toYdbConverter.Process(*batch, errStr);
+ Cerr << "Process: " << errStr << "\n";
UNIT_ASSERT(ok);
UNIT_ASSERT_VALUES_EQUAL(cellRows.size(), rowWriter.Rows.size());
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.txt b/ydb/core/tx/columnshard/engines/CMakeLists.txt
index b38105c2716..10fd75a948d 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.txt
@@ -21,6 +21,8 @@ target_link_libraries(tx-columnshard-engines PUBLIC
ydb-core-scheme
ydb-core-tablet
ydb-core-tablet_flat
+ udf-service-exception_policy
+ yql-sql-pg_dummy
)
target_sources(tx-columnshard-engines PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
index 07a84594999..c05c11e6e28 100644
--- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
+++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
@@ -148,6 +148,7 @@ protected:
TVector<std::pair<TString, NScheme::TTypeId>> SrcColumns; // source columns in CSV could have any order
TVector<std::pair<TString, NScheme::TTypeId>> YdbSchema;
THashMap<ui32, size_t> Id2Position; // columnId -> its position in YdbSchema
+ THashMap<TString, NScheme::TTypeId> ColumnsToConvert;
bool WriteToTableShadow = false;
bool AllowWriteToPrivateTable = false;
@@ -405,6 +406,12 @@ private:
Id2Position[columnId] = position;
YdbSchema[position] = std::make_pair(ValueColumnNames[i], ValueColumnPositions[i].Type);
}
+
+ for (const auto& [colName, colType] : YdbSchema) {
+ if (NArrow::TArrowToYdbConverter::NeedDataConversion(colType)) {
+ ColumnsToConvert[colName] = colType;
+ }
+ }
}
if (!keyColumnsLeft.empty()) {
@@ -497,8 +504,11 @@ private:
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, errorMessage, ctx);
}
- if (isOlapTable && !ExtractBatch(errorMessage)) {
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, errorMessage, ctx);
+ if (isOlapTable) {
+ // TUploadRowsRPCPublic::ExtractBatch() - converted JsonDocument, DynNumbers, ...
+ if (!ExtractBatch(errorMessage)) {
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, errorMessage, ctx);
+ }
} else {
FindMinMaxKeys();
}
@@ -507,10 +517,25 @@ private:
case EUploadSource::ArrowBatch:
case EUploadSource::CSV:
{
- if (!ExtractBatch(errorMessage)) {
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, errorMessage, ctx);
- }
- if (!isOlapTable) {
+ if (isOlapTable) {
+ // TUploadColumnsRPCPublic::ExtractBatch() - NOT converted JsonDocument, DynNumbers, ...
+ if (!ExtractBatch(errorMessage)) {
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, errorMessage, ctx);
+ }
+ // Explicit types conversion
+ if (!ColumnsToConvert.empty()) {
+ Batch = NArrow::ConvertColumns(Batch, ColumnsToConvert);
+ if (!Batch) {
+ errorMessage = "Cannot upsert arrow batch: one of data types has no conversion";
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, errorMessage, ctx);
+ }
+ }
+ } else {
+ // TUploadColumnsRPCPublic::ExtractBatch() - NOT converted JsonDocument, DynNumbers, ...
+ if (!ExtractBatch(errorMessage)) {
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, errorMessage, ctx);
+ }
+ // Implicit types conversion inside ExtractRows(), in TArrowToYdbConverter
if (!ExtractRows(errorMessage)) {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, errorMessage, ctx);
}
diff --git a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp
index b668857c30c..4bb0e9db428 100644
--- a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp
+++ b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp
@@ -11,18 +11,62 @@ using namespace NYdb;
namespace {
-// TODO: there's no way to read all data via LongTx Read. It returns some part of result.
-TString Read(TDriver& connection, const TString& tablePath) {
- NYdb::NLongTx::TClient client(connection);
-
- NLongTx::TLongTxBeginResult resBeginTx = client.BeginReadTx().GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS);
-
- auto txId = resBeginTx.GetResult().tx_id();
+ui32 ScanQuerySelect(NYdb::NTable::TTableClient client, const TString& tablePath,
+ const std::vector<std::pair<TString, NYdb::EPrimitiveType>>& ydbSchema = TTestOlap::PublicSchema()) {
+ auto query = Sprintf("SELECT * FROM `%s`", tablePath.c_str());
+
+ // Executes scan query
+ auto result = client.StreamExecuteScanQuery(query).GetValueSync();
+ if (!result.IsSuccess()) {
+ Cerr << "ScanQuery execution failure: " << result.GetIssues().ToString() << Endl;
+ return 0;
+ }
- NLongTx::TLongTxReadResult resRead = client.Read(txId, tablePath).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::SUCCESS);
- return resRead.GetResult().data().data();
+ ui32 numRows = 0;
+ bool eos = false;
+ Cout << "ScanQuery:" << Endl;
+ while (!eos) {
+ auto streamPart = result.ReadNext().ExtractValueSync();
+ if (!streamPart.IsSuccess()) {
+ eos = true;
+ if (!streamPart.EOS()) {
+ Cerr << "ScanQuery execution failure: " << streamPart.GetIssues().ToString() << Endl;
+ return 0;
+ }
+ continue;
+ }
+
+ if (streamPart.HasResultSet()) {
+ auto rs = streamPart.ExtractResultSet();
+ auto columns = rs.GetColumnsMeta();
+
+ TResultSetParser parser(rs);
+ while (parser.TryNextRow()) {
+ for (auto& [colName, colType] : ydbSchema) {
+ switch (colType) {
+ case NYdb::EPrimitiveType::Timestamp:
+ Cout << parser.ColumnParser(colName).GetOptionalTimestamp() << ", ";
+ break;
+ case NYdb::EPrimitiveType::Utf8:
+ Cout << parser.ColumnParser(colName).GetOptionalUtf8() << ", ";
+ break;
+ case NYdb::EPrimitiveType::Int32:
+ Cout << parser.ColumnParser(colName).GetOptionalInt32() << ", ";
+ break;
+ case NYdb::EPrimitiveType::JsonDocument:
+ Cout << parser.ColumnParser(colName).GetOptionalJsonDocument() << ", ";
+ break;
+ default:
+ Cout << "<other>, ";
+ break;
+ }
+ }
+ Cout << Endl;
+ ++numRows;
+ }
+ }
+ }
+ return numRows;
}
}
@@ -60,13 +104,9 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) {
}
Cerr << "Upsert done: " << TInstant::Now() - start << Endl;
- { // read with long tx read
- TString readRes = Read(connection, tablePath);
- UNIT_ASSERT(!readRes.empty());
-
- auto batch = NArrow::DeserializeBatch(readRes, schema);
- UNIT_ASSERT(batch);
- UNIT_ASSERT(batch->num_rows() > 0);
+ { // Read all
+ ui32 numRows = ScanQuerySelect(client, tablePath);
+ UNIT_ASSERT_GT(numRows, 0);
}
// Negatives
@@ -151,13 +191,9 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) {
}
Cerr << "Upsert done: " << TInstant::Now() - start << Endl;
- { // read with long tx read
- TString readRes = Read(connection, tablePath);
- UNIT_ASSERT(!readRes.empty());
-
- auto batch = NArrow::DeserializeBatch(readRes, schema);
- UNIT_ASSERT(batch);
- UNIT_ASSERT(batch->num_rows() > 0);
+ { // Read all
+ ui32 numRows = ScanQuerySelect(client, tablePath);
+ UNIT_ASSERT_GT(numRows, 0);
}
// Negatives
@@ -230,11 +266,11 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) {
}
{ // Big CSV batch
- auto bigBatch = TTestOlap::SampleBatch(true, 150000);
+ auto bigBatch = TTestOlap::SampleBatch(true, 130000);
ui32 batchSize = NArrow::GetBatchDataSize(bigBatch);
Cerr << "rows: " << bigBatch->num_rows() << " batch size: " << batchSize << Endl;
UNIT_ASSERT(batchSize > 15 * 1024 * 1024);
- UNIT_ASSERT(batchSize < 20 * 1024 * 1024);
+ UNIT_ASSERT(batchSize < 17 * 1024 * 1024);
TString bigCsv = TTestOlap::ToCSV(bigBatch);
@@ -246,11 +282,11 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) {
}
{ // Too big CSV batch
- auto bigBatch = TTestOlap::SampleBatch(true, 200000); // 2 shards, greater then 8 Mb per shard
+ auto bigBatch = TTestOlap::SampleBatch(true, 150000); // 2 shards, greater then 8 Mb per shard
ui32 batchSize = NArrow::GetBatchDataSize(bigBatch);
Cerr << "rows: " << bigBatch->num_rows() << " batch size: " << batchSize << Endl;
- UNIT_ASSERT(batchSize > 20 * 1024 * 1024);
- UNIT_ASSERT(batchSize < 30 * 1024 * 1024);
+ UNIT_ASSERT(batchSize > 16 * 1024 * 1024);
+ UNIT_ASSERT(batchSize < 20 * 1024 * 1024);
TString bigCsv = TTestOlap::ToCSV(bigBatch);
@@ -304,6 +340,11 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) {
}
Cerr << "Upsert done: " << TInstant::Now() - start << Endl;
+ { // Read all
+ ui32 numRows = ScanQuerySelect(client, tablePath);
+ UNIT_ASSERT_GT(numRows, 0);
+ }
+
// Read
auto res = session.ExecuteDataQuery("SELECT count(*) AS _cnt FROM [/Root/LogsX];",
NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
diff --git a/ydb/services/ydb/ydb_common_ut.h b/ydb/services/ydb/ydb_common_ut.h
index c2966ba7440..a3a9bbc1bcd 100644
--- a/ydb/services/ydb/ydb_common_ut.h
+++ b/ydb/services/ydb/ydb_common_ut.h
@@ -189,7 +189,7 @@ struct TTestOlap {
{ "uid", NYdb::EPrimitiveType::Utf8 },
{ "level", NYdb::EPrimitiveType::Int32 },
{ "message", NYdb::EPrimitiveType::Utf8 },
- { "json_payload", NYdb::EPrimitiveType::String },
+ { "json_payload", NYdb::EPrimitiveType::JsonDocument },
{ "ingested_at", NYdb::EPrimitiveType::Timestamp },
{ "saved_at", NYdb::EPrimitiveType::Timestamp },
{ "request_id", NYdb::EPrimitiveType::Utf8 }
@@ -209,7 +209,6 @@ struct TTestOlap {
const TString& storeName = StoreName, const TString& tableName = TableName) {
TString tableDescr = Sprintf(R"(
Name: "%s"
- #MetaShardCount: 1
ColumnShardCount: 4
SchemaPresets {
Name: "default"
@@ -220,7 +219,7 @@ struct TTestOlap {
Columns { Name: "uid" Type: "Utf8" }
Columns { Name: "level" Type: "Int32" }
Columns { Name: "message" Type: "Utf8" }
- Columns { Name: "json_payload" Type: "Json" }
+ Columns { Name: "json_payload" Type: "JsonDocument" }
Columns { Name: "ingested_at" Type: "Timestamp" }
Columns { Name: "saved_at" Type: "Timestamp" }
Columns { Name: "request_id" Type: "Utf8" }
@@ -279,7 +278,7 @@ struct TTestOlap {
Y_VERIFY(NArrow::Append<arrow::StringType>(*builders[3], s));
Y_VERIFY(NArrow::Append<arrow::Int32Type>(*builders[4], i));
Y_VERIFY(NArrow::Append<arrow::StringType>(*builders[5], s + "str"));
- Y_VERIFY(NArrow::Append<arrow::BinaryType>(*builders[6], s + "bin"));
+ Y_VERIFY(NArrow::Append<arrow::BinaryType>(*builders[6], "{ \"value\": " + s + " }"));
Y_VERIFY(NArrow::Append<arrow::StringType>(*builders[9], s + "str"));
}