diff options
author | azevaykin <azevaykin@yandex-team.com> | 2023-10-28 10:32:46 +0300 |
---|---|---|
committer | azevaykin <azevaykin@yandex-team.com> | 2023-10-28 10:56:20 +0300 |
commit | a0680cad764b5edad1359f7a4493deb4cb795996 (patch) | |
tree | 39a453b2d2b62c0430918b2d1d8c787cb69d772e | |
parent | 998d1d441b86d8229f793c3e7be30f46853544ac (diff) | |
download | ydb-a0680cad764b5edad1359f7a4493deb4cb795996.tar.gz |
Check errors in read_table_scan
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_common.cpp | 24 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_common.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_read_table.cpp | 36 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_read_table.h | 12 | ||||
-rw-r--r-- | ydb/core/tx/datashard/read_table_scan.cpp | 194 | ||||
-rw-r--r-- | ydb/core/tx/datashard/read_table_scan.h | 7 | ||||
-rw-r--r-- | ydb/core/tx/datashard/read_table_scan_unit.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_export/ut_export.cpp | 2 |
8 files changed, 217 insertions, 67 deletions
diff --git a/ydb/core/tx/datashard/datashard_ut_common.cpp b/ydb/core/tx/datashard/datashard_ut_common.cpp index 0cdda42c56..ab18bd8673 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.cpp +++ b/ydb/core/tx/datashard/datashard_ut_common.cpp @@ -10,6 +10,7 @@ #include <ydb/core/tx/balance_coverage/balance_coverage_builder.h> #include <ydb/core/tx/tx_allocator/txallocator.h> #include <ydb/core/tx/tx_proxy/proxy.h> +#include <ydb/core/tx/tx_proxy/upload_rows.h> #include <ydb/core/tx/schemeshard/schemeshard_build_index.h> #include <ydb/public/sdk/cpp/client/ydb_result/result.h> @@ -1796,6 +1797,29 @@ void ExecSQL(Tests::TServer::TPtr server, UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetRef().GetYdbStatus(), code); } +void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, const TVector<std::pair<TString, Ydb::Type_PrimitiveTypeId>>& types, const TVector<TCell>& keys, const TVector<TCell>& values) +{ + auto txTypes = std::make_shared<NTxProxy::TUploadTypes>(); + std::transform(types.cbegin(), types.cend(), std::back_inserter(*txTypes), [](const auto& iter) { + const TString& columnName = iter.first; + Ydb::Type columnType; + columnType.set_type_id(iter.second); + return std::make_pair(columnName, columnType); + }); + + auto txRows = std::make_shared<NTxProxy::TUploadRows>(); + TSerializedCellVec serializedKey(keys); + TString serializedValues(TSerializedCellVec::Serialize(values)); + txRows->emplace_back(serializedKey, serializedValues); + + auto uploadSender = runtime.AllocateEdgeActor(); + auto actor = NTxProxy::CreateUploadRowsInternal(uploadSender, tablePath, txTypes, txRows); + runtime.Register(actor); + + auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvUploadRowsResponse>(uploadSender); + UNIT_ASSERT_VALUES_EQUAL_C(ev->Get()->Status, Ydb::StatusIds::SUCCESS, "Status: " << ev->Get()->Status << " Issues: " << ev->Get()->Issues.ToOneLineString()); +} + void WaitTabletBecomesOffline(TServer::TPtr server, ui64 tabletId) { struct IsShardStateChange diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h index c959708f97..938b427551 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.h +++ b/ydb/core/tx/datashard/datashard_ut_common.h @@ -708,6 +708,8 @@ void ExecSQL(Tests::TServer::TPtr server, bool dml = true, Ydb::StatusIds::StatusCode code = Ydb::StatusIds::SUCCESS); +void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, const TVector<std::pair<TString, Ydb::Type_PrimitiveTypeId>>& types, const TVector<TCell>& keys, const TVector<TCell>& values); + struct IsTxResultComplete { bool operator()(IEventHandle& ev) { diff --git a/ydb/core/tx/datashard/datashard_ut_read_table.cpp b/ydb/core/tx/datashard/datashard_ut_read_table.cpp index 2f84c1010d..5192d3ceef 100644 --- a/ydb/core/tx/datashard/datashard_ut_read_table.cpp +++ b/ydb/core/tx/datashard/datashard_ut_read_table.cpp @@ -573,6 +573,42 @@ Y_UNIT_TEST_SUITE(DataShardReadTableSnapshots) { "key = 6, value = 66\n"); } + Y_UNIT_TEST(CorruptedDyNumber) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE); + runtime.GetAppData().AllowReadTableImmediate = true; + + InitRoot(server, sender); + + CreateShardedTable(server, sender, "/Root", "Table", + TShardedTableOptions().Columns({ + {"key", "Uint32", true, false}, + {"value", "DyNumber", false, false} + })); + + // Write bad DyNumber + UploadRows(runtime, "/Root/Table", + {{"key", Ydb::Type::UINT32}, {"value", Ydb::Type::DYNUMBER}}, + {TCell::Make(ui32(1))}, {TCell::Make(ui32(55555))} + ); + + auto table1state = TReadTableState(server, MakeReadTableSettings("/Root/Table", true)); + + UNIT_ASSERT(!table1state.Next()); + + UNIT_ASSERT(table1state.IsError); + UNIT_ASSERT_VALUES_EQUAL(table1state.LastResult, "ERROR: ExecError\n"); + } + } // Y_UNIT_TEST_SUITE(DataShardReadTableSnapshots) } // namespace NKikimr diff --git a/ydb/core/tx/datashard/datashard_ut_read_table.h b/ydb/core/tx/datashard/datashard_ut_read_table.h index 1967e42172..91cf08367f 100644 --- a/ydb/core/tx/datashard/datashard_ut_read_table.h +++ b/ydb/core/tx/datashard/datashard_ut_read_table.h @@ -29,10 +29,12 @@ namespace NDataShardReadTableTest { struct TEvResult : public TEventLocal<TEvResult, EvResult> { TString Result; bool Finished; + bool IsError; - TEvResult(TString result, bool finished) + TEvResult(TString result, bool finished, bool isError) : Result(std::move(result)) , Finished(finished) + , IsError(isError) { } }; @@ -87,17 +89,17 @@ namespace NDataShardReadTableTest { result << Endl; } NotifyReady(ctx); - ctx.Send(Edge, new TEvResult(result, false)); + ctx.Send(Edge, new TEvResult(result, false, false)); break; } case TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete: { NotifyReady(ctx); - ctx.Send(Edge, new TEvResult({ }, true)); + ctx.Send(Edge, new TEvResult({ }, true, false)); return Die(ctx); } default: { NotifyReady(ctx); - ctx.Send(Edge, new TEvResult(TStringBuilder() << "ERROR: " << status << Endl, true)); + ctx.Send(Edge, new TEvResult(TStringBuilder() << "ERROR: " << status << Endl, true, true)); return Die(ctx); } } @@ -201,6 +203,7 @@ namespace NDataShardReadTableTest { TString LastResult; TStringBuilder Result; bool Finished = false; + bool IsError = false; TReadTableState(Tests::TServer::TPtr server, const NTxProxy::TReadTableSettings& settings) : Runtime(*server->GetRuntime()) @@ -218,6 +221,7 @@ namespace NDataShardReadTableTest { auto ev = Runtime.GrabEdgeEventRethrow<TReadTableDriver::TEvResult>(Edge); LastResult = ev->Get()->Result; Finished = ev->Get()->Finished; + IsError = ev->Get()->IsError; Result << LastResult; } diff --git a/ydb/core/tx/datashard/read_table_scan.cpp b/ydb/core/tx/datashard/read_table_scan.cpp index ddf44d9181..818018b398 100644 --- a/ydb/core/tx/datashard/read_table_scan.cpp +++ b/ydb/core/tx/datashard/read_table_scan.cpp @@ -20,81 +20,142 @@ namespace NDataShard { using NTable::EScan; //YdbOld and Ydb.v1 have same value representation -template<typename TOutValue> -Y_FORCE_INLINE void AddCell(TOutValue& row, NScheme::TTypeInfo type, const TCell &cell) +template <typename TOutValue> +Y_FORCE_INLINE bool AddCell(TOutValue& row, NScheme::TTypeInfo type, const TCell& cell, TString& err) { auto &val = *row.add_items(); if (cell.IsNull()) { val.set_null_flag_value(::google::protobuf::NULL_VALUE); - return; + return true; } switch (type.GetTypeId()) { - case NUdf::TDataType<bool>::Id: - val.set_bool_value(cell.AsValue<bool>()); + case NUdf::TDataType<bool>::Id: { + bool value; + if (!cell.ToValue(value, err)) + return false; + val.set_bool_value(value); break; - case NUdf::TDataType<ui8>::Id: - val.set_uint32_value(cell.AsValue<ui8>()); + } + case NUdf::TDataType<ui8>::Id: { + ui8 value; + if (!cell.ToValue(value, err)) + return false; + val.set_uint32_value(value); break; - case NUdf::TDataType<i8>::Id: - val.set_int32_value(cell.AsValue<i8>()); + } + case NUdf::TDataType<i8>::Id: { + i8 value; + if (!cell.ToValue(value, err)) + return false; + val.set_int32_value(value); break; - case NUdf::TDataType<ui16>::Id: - val.set_uint32_value(cell.AsValue<ui16>()); + } + case NUdf::TDataType<ui16>::Id: { + ui16 value; + if (!cell.ToValue(value, err)) + return false; + val.set_uint32_value(value); break; - case NUdf::TDataType<i16>::Id: - val.set_int32_value(cell.AsValue<i16>()); + } + case NUdf::TDataType<i16>::Id: { + i16 value; + if (!cell.ToValue(value, err)) + return false; + val.set_int32_value(value); break; - case NUdf::TDataType<ui32>::Id: - val.set_uint32_value(cell.AsValue<ui32>()); + } + case NUdf::TDataType<ui32>::Id: { + ui32 value; + if (!cell.ToValue(value, err)) + return false; + val.set_uint32_value(value); break; - case NUdf::TDataType<i32>::Id: - val.set_int32_value(cell.AsValue<i32>()); + } + case NUdf::TDataType<i32>::Id: { + i32 value; + if (!cell.ToValue(value, err)) + return false; + val.set_int32_value(value); break; - case NUdf::TDataType<ui64>::Id: - val.set_uint64_value(cell.AsValue<ui64>()); + } + case NUdf::TDataType<ui64>::Id: { + ui64 value; + if (!cell.ToValue(value, err)) + return false; + val.set_uint64_value(value); break; - case NUdf::TDataType<i64>::Id: - val.set_int64_value(cell.AsValue<i64>()); + } + case NUdf::TDataType<i64>::Id: { + i64 value; + if (!cell.ToValue(value, err)) + return false; + val.set_int64_value(value); break; - case NUdf::TDataType<float>::Id: - val.set_float_value(cell.AsValue<float>()); + } + case NUdf::TDataType<float>::Id: { + float value; + if (!cell.ToValue(value, err)) + return false; + val.set_float_value(value); break; - case NUdf::TDataType<double>::Id: - val.set_double_value(cell.AsValue<double>()); + } + case NUdf::TDataType<double>::Id: { + double value; + if (!cell.ToValue(value, err)) + return false; + val.set_double_value(value); break; + } case NUdf::TDataType<NUdf::TJson>::Id: - case NUdf::TDataType<NUdf::TUtf8>::Id: + case NUdf::TDataType<NUdf::TUtf8>::Id: { val.set_text_value(cell.Data(), cell.Size()); break; - case NUdf::TDataType<NUdf::TYson>::Id: + } + case NUdf::TDataType<NUdf::TYson>::Id: { val.set_bytes_value(cell.Data(), cell.Size()); break; - case NUdf::TDataType<NUdf::TDecimal>::Id: - { - Y_DEBUG_ABORT_UNLESS(cell.Size() == 16); - struct TCellData { - ui64 Low; - ui64 High; - }; - const auto data = cell.AsValue<TCellData>(); - val.set_low_128(data.Low); - val.set_high_128(data.High); - } + } + case NUdf::TDataType<NUdf::TDecimal>::Id: { + struct TCellData { + ui64 Low; + ui64 High; + } value; + if (!cell.ToValue(value, err)) + return false; + val.set_low_128(value.Low); + val.set_high_128(value.High); break; - case NUdf::TDataType<NUdf::TDate>::Id: - val.set_uint32_value(cell.AsValue<ui16>()); + } + case NUdf::TDataType<NUdf::TDate>::Id: { + ui16 value; + if (!cell.ToValue(value, err)) + return false; + val.set_uint32_value(value); break; - case NUdf::TDataType<NUdf::TDatetime>::Id: - val.set_uint32_value(cell.AsValue<ui32>()); + } + case NUdf::TDataType<NUdf::TDatetime>::Id: { + ui32 value; + if (!cell.ToValue(value, err)) + return false; + val.set_uint32_value(value); break; - case NUdf::TDataType<NUdf::TTimestamp>::Id: - val.set_uint64_value(cell.AsValue<ui64>()); + } + case NUdf::TDataType<NUdf::TTimestamp>::Id: { + ui64 value; + if (!cell.ToValue(value, err)) + return false; + val.set_uint64_value(value); break; - case NUdf::TDataType<NUdf::TInterval>::Id: - val.set_int64_value(cell.AsValue<i64>()); + } + case NUdf::TDataType<NUdf::TInterval>::Id: { + i64 value; + if (!cell.ToValue(value, err)) + return false; + val.set_int64_value(value); break; + } case NUdf::TDataType<NUdf::TJsonDocument>::Id: { const auto json = NBinaryJson::SerializeToJson(TStringBuf(cell.Data(), cell.Size())); val.set_text_value(json); @@ -102,7 +163,10 @@ Y_FORCE_INLINE void AddCell(TOutValue& row, NScheme::TTypeInfo type, const TCell } case NUdf::TDataType<NUdf::TDyNumber>::Id: { const auto number = NDyNumber::DyNumberToString(TStringBuf(cell.Data(), cell.Size())); - Y_ABORT_UNLESS(number.Defined(), "Invalid DyNumber binary representation"); + if (!number.Defined()) { + err = "Invalid DyNumber binary representation"; + return false; + } val.set_text_value(*number); break; } @@ -111,13 +175,17 @@ Y_FORCE_INLINE void AddCell(TOutValue& row, NScheme::TTypeInfo type, const TCell TString(cell.Data(), cell.Size()), NPg::PgTypeIdFromTypeDesc(type.GetTypeDesc()) ); - Y_ABORT_UNLESS(!result.Error, "Failed to add cell to Ydb::Value: %s", (*result.Error).c_str()); + if (result.Error) { + err = Sprintf("Failed to add cell to Ydb::Value: %s", (*result.Error).c_str()); + return false; + } val.set_text_value(result.Str); break; } default: val.set_bytes_value(cell.Data(), cell.Size()); } + return true; } class TRowsToResult { @@ -139,10 +207,12 @@ public: ui64 GetMessageSize() const { return ResultString.size(); } ui64 GetMessageRows() const { return CurrentMessageRows; } - void PutRow(const NTable::TRowState& row) { + bool PutRow(const NTable::TRowState& row, TString& err) { RowOffsets.push_back(static_cast<ui32>(ResultString.size())); - DoPutRow(row); + if (!DoPutRow(row, err)) + return false; ++CurrentMessageRows; + return true; } void Reserve(ui64 size) @@ -174,7 +244,7 @@ protected: CurrentMessageRows = 0; } private: - virtual void DoPutRow(const NTable::TRowState &row) = 0; + virtual bool DoPutRow(const NTable::TRowState& row, TString& err) = 0; virtual ui32 GetResultVersion() const = 0; ui64 CurrentMessageRows; @@ -198,17 +268,19 @@ public: } private: - void DoPutRow(const NTable::TRowState &row) override + bool DoPutRow(const NTable::TRowState &row, TString& err) override { auto &protoRow = *OldResultSet.add_rows(); auto cells = *row; for (size_t col = 0; col < cells.size(); ++col) { - AddCell(protoRow, ColTypes[col], cells[col]); + if (!AddCell(protoRow, ColTypes[col], cells[col], err)) + return false; } OldResultSet.SerializeToArcadiaStream(&ResultStream); OldResultSet.Clear(); + return true; } ui32 GetResultVersion() const override { return 0; } @@ -240,17 +312,19 @@ public: } private: - void DoPutRow(const NTable::TRowState& row) override + bool DoPutRow(const NTable::TRowState& row, TString& err) override { auto &protoRow = *YdbResultSet.add_rows(); auto cells = *row; for (size_t col = 0; col < cells.size(); ++col) { - AddCell(protoRow, ColTypes[col], cells[col]); + if (!AddCell(protoRow, ColTypes[col], cells[col], err)) + return false; } YdbResultSet.SerializeToArcadiaStream(&ResultStream); YdbResultSet.Clear(); + return true; } ui32 GetResultVersion() const override { return NKikimrTxUserProxy::TReadTableTransaction::YDB_V1; } @@ -584,7 +658,11 @@ private: { Y_DEBUG_ABORT_UNLESS(DebugCheckKeyInRange(key)); - Writer->PutRow(row); + if (!Writer->PutRow(row, Error)) { + LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Got scan fatal error: " << Error); + IsFatalError = true; + return EScan::Final; + } return MaybeSendResponseMessage(false, key); } @@ -622,8 +700,7 @@ private: Driver = nullptr; Die(ctx); - - return new TReadTableProd(Error, SchemaChanged); + return new TReadTableProd(Error, IsFatalError, SchemaChanged); } private: @@ -637,6 +714,7 @@ private: ui64 MessageSizeLimit; ui64 MessageRowsLimit; TString Error; + bool IsFatalError = false; THolder<TRowsToResult> Writer; TUserTable::TCPtr TableInfo; NKikimrTxDataShard::TReadTableTransaction Tx; diff --git a/ydb/core/tx/datashard/read_table_scan.h b/ydb/core/tx/datashard/read_table_scan.h index 0650c6ef67..f395459b31 100644 --- a/ydb/core/tx/datashard/read_table_scan.h +++ b/ydb/core/tx/datashard/read_table_scan.h @@ -8,12 +8,15 @@ namespace NDataShard { class TReadTableProd : public IDestructable { public: - TReadTableProd(const TString &error, bool schemaChanged = false) + TReadTableProd(const TString& error, bool IsFatalError, bool schemaChanged) : Error(error) + , IsFatalError(IsFatalError) , SchemaChanged(schemaChanged) - {} + { + } TString Error; + bool IsFatalError; bool SchemaChanged; }; diff --git a/ydb/core/tx/datashard/read_table_scan_unit.cpp b/ydb/core/tx/datashard/read_table_scan_unit.cpp index 62514580ba..936698e70d 100644 --- a/ydb/core/tx/datashard/read_table_scan_unit.cpp +++ b/ydb/core/tx/datashard/read_table_scan_unit.cpp @@ -162,11 +162,14 @@ EExecutionStatus TReadTableScanUnit::Execute(TOperation::TPtr op, LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "ReadTable scan complete for " << *op << " at " - << DataShard.TabletID() << " error: " << result->Error); + << DataShard.TabletID() << " error: " << result->Error << ", IsFatalError: " << result->IsFatalError); tx->SetScanTask(0); - if (result->SchemaChanged) { + if (result->IsFatalError) { + BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::ERROR) + ->SetExecutionError(NKikimrTxDataShard::TError::PROGRAM_ERROR, result->Error); + } else if (result->SchemaChanged) { BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::ERROR) ->AddError(NKikimrTxDataShard::TError::SCHEME_CHANGED, result->Error); } else if (result->Error) { diff --git a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp index 0ccc2ab877..c8db1c6081 100644 --- a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp +++ b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp @@ -1373,7 +1373,7 @@ partitioning_settings { TestGetExport(runtime, exportId, "/MyRoot"); } - Y_UNIT_TEST(CorruptedBadDyNumber) { + Y_UNIT_TEST(CorruptedDyNumber) { TTestBasicRuntime runtime; TTestEnv env(runtime, TTestEnvOptions().DisableStatsBatching(true)); ui64 txId = 100; |