aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorazevaykin <azevaykin@yandex-team.com>2023-10-28 10:32:46 +0300
committerazevaykin <azevaykin@yandex-team.com>2023-10-28 10:56:20 +0300
commita0680cad764b5edad1359f7a4493deb4cb795996 (patch)
tree39a453b2d2b62c0430918b2d1d8c787cb69d772e
parent998d1d441b86d8229f793c3e7be30f46853544ac (diff)
downloadydb-a0680cad764b5edad1359f7a4493deb4cb795996.tar.gz
Check errors in read_table_scan
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.cpp24
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.h2
-rw-r--r--ydb/core/tx/datashard/datashard_ut_read_table.cpp36
-rw-r--r--ydb/core/tx/datashard/datashard_ut_read_table.h12
-rw-r--r--ydb/core/tx/datashard/read_table_scan.cpp194
-rw-r--r--ydb/core/tx/datashard/read_table_scan.h7
-rw-r--r--ydb/core/tx/datashard/read_table_scan_unit.cpp7
-rw-r--r--ydb/core/tx/schemeshard/ut_export/ut_export.cpp2
8 files changed, 217 insertions, 67 deletions
diff --git a/ydb/core/tx/datashard/datashard_ut_common.cpp b/ydb/core/tx/datashard/datashard_ut_common.cpp
index 0cdda42c56..ab18bd8673 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_common.cpp
@@ -10,6 +10,7 @@
#include <ydb/core/tx/balance_coverage/balance_coverage_builder.h>
#include <ydb/core/tx/tx_allocator/txallocator.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
+#include <ydb/core/tx/tx_proxy/upload_rows.h>
#include <ydb/core/tx/schemeshard/schemeshard_build_index.h>
#include <ydb/public/sdk/cpp/client/ydb_result/result.h>
@@ -1796,6 +1797,29 @@ void ExecSQL(Tests::TServer::TPtr server,
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetRef().GetYdbStatus(), code);
}
+void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, const TVector<std::pair<TString, Ydb::Type_PrimitiveTypeId>>& types, const TVector<TCell>& keys, const TVector<TCell>& values)
+{
+ auto txTypes = std::make_shared<NTxProxy::TUploadTypes>();
+ std::transform(types.cbegin(), types.cend(), std::back_inserter(*txTypes), [](const auto& iter) {
+ const TString& columnName = iter.first;
+ Ydb::Type columnType;
+ columnType.set_type_id(iter.second);
+ return std::make_pair(columnName, columnType);
+ });
+
+ auto txRows = std::make_shared<NTxProxy::TUploadRows>();
+ TSerializedCellVec serializedKey(keys);
+ TString serializedValues(TSerializedCellVec::Serialize(values));
+ txRows->emplace_back(serializedKey, serializedValues);
+
+ auto uploadSender = runtime.AllocateEdgeActor();
+ auto actor = NTxProxy::CreateUploadRowsInternal(uploadSender, tablePath, txTypes, txRows);
+ runtime.Register(actor);
+
+ auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvUploadRowsResponse>(uploadSender);
+ UNIT_ASSERT_VALUES_EQUAL_C(ev->Get()->Status, Ydb::StatusIds::SUCCESS, "Status: " << ev->Get()->Status << " Issues: " << ev->Get()->Issues.ToOneLineString());
+}
+
void WaitTabletBecomesOffline(TServer::TPtr server, ui64 tabletId)
{
struct IsShardStateChange
diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h
index c959708f97..938b427551 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.h
+++ b/ydb/core/tx/datashard/datashard_ut_common.h
@@ -708,6 +708,8 @@ void ExecSQL(Tests::TServer::TPtr server,
bool dml = true,
Ydb::StatusIds::StatusCode code = Ydb::StatusIds::SUCCESS);
+void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, const TVector<std::pair<TString, Ydb::Type_PrimitiveTypeId>>& types, const TVector<TCell>& keys, const TVector<TCell>& values);
+
struct IsTxResultComplete {
bool operator()(IEventHandle& ev)
{
diff --git a/ydb/core/tx/datashard/datashard_ut_read_table.cpp b/ydb/core/tx/datashard/datashard_ut_read_table.cpp
index 2f84c1010d..5192d3ceef 100644
--- a/ydb/core/tx/datashard/datashard_ut_read_table.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_read_table.cpp
@@ -573,6 +573,42 @@ Y_UNIT_TEST_SUITE(DataShardReadTableSnapshots) {
"key = 6, value = 66\n");
}
+ Y_UNIT_TEST(CorruptedDyNumber) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE);
+ runtime.GetAppData().AllowReadTableImmediate = true;
+
+ InitRoot(server, sender);
+
+ CreateShardedTable(server, sender, "/Root", "Table",
+ TShardedTableOptions().Columns({
+ {"key", "Uint32", true, false},
+ {"value", "DyNumber", false, false}
+ }));
+
+ // Write bad DyNumber
+ UploadRows(runtime, "/Root/Table",
+ {{"key", Ydb::Type::UINT32}, {"value", Ydb::Type::DYNUMBER}},
+ {TCell::Make(ui32(1))}, {TCell::Make(ui32(55555))}
+ );
+
+ auto table1state = TReadTableState(server, MakeReadTableSettings("/Root/Table", true));
+
+ UNIT_ASSERT(!table1state.Next());
+
+ UNIT_ASSERT(table1state.IsError);
+ UNIT_ASSERT_VALUES_EQUAL(table1state.LastResult, "ERROR: ExecError\n");
+ }
+
} // Y_UNIT_TEST_SUITE(DataShardReadTableSnapshots)
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/datashard_ut_read_table.h b/ydb/core/tx/datashard/datashard_ut_read_table.h
index 1967e42172..91cf08367f 100644
--- a/ydb/core/tx/datashard/datashard_ut_read_table.h
+++ b/ydb/core/tx/datashard/datashard_ut_read_table.h
@@ -29,10 +29,12 @@ namespace NDataShardReadTableTest {
struct TEvResult : public TEventLocal<TEvResult, EvResult> {
TString Result;
bool Finished;
+ bool IsError;
- TEvResult(TString result, bool finished)
+ TEvResult(TString result, bool finished, bool isError)
: Result(std::move(result))
, Finished(finished)
+ , IsError(isError)
{ }
};
@@ -87,17 +89,17 @@ namespace NDataShardReadTableTest {
result << Endl;
}
NotifyReady(ctx);
- ctx.Send(Edge, new TEvResult(result, false));
+ ctx.Send(Edge, new TEvResult(result, false, false));
break;
}
case TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete: {
NotifyReady(ctx);
- ctx.Send(Edge, new TEvResult({ }, true));
+ ctx.Send(Edge, new TEvResult({ }, true, false));
return Die(ctx);
}
default: {
NotifyReady(ctx);
- ctx.Send(Edge, new TEvResult(TStringBuilder() << "ERROR: " << status << Endl, true));
+ ctx.Send(Edge, new TEvResult(TStringBuilder() << "ERROR: " << status << Endl, true, true));
return Die(ctx);
}
}
@@ -201,6 +203,7 @@ namespace NDataShardReadTableTest {
TString LastResult;
TStringBuilder Result;
bool Finished = false;
+ bool IsError = false;
TReadTableState(Tests::TServer::TPtr server, const NTxProxy::TReadTableSettings& settings)
: Runtime(*server->GetRuntime())
@@ -218,6 +221,7 @@ namespace NDataShardReadTableTest {
auto ev = Runtime.GrabEdgeEventRethrow<TReadTableDriver::TEvResult>(Edge);
LastResult = ev->Get()->Result;
Finished = ev->Get()->Finished;
+ IsError = ev->Get()->IsError;
Result << LastResult;
}
diff --git a/ydb/core/tx/datashard/read_table_scan.cpp b/ydb/core/tx/datashard/read_table_scan.cpp
index ddf44d9181..818018b398 100644
--- a/ydb/core/tx/datashard/read_table_scan.cpp
+++ b/ydb/core/tx/datashard/read_table_scan.cpp
@@ -20,81 +20,142 @@ namespace NDataShard {
using NTable::EScan;
//YdbOld and Ydb.v1 have same value representation
-template<typename TOutValue>
-Y_FORCE_INLINE void AddCell(TOutValue& row, NScheme::TTypeInfo type, const TCell &cell)
+template <typename TOutValue>
+Y_FORCE_INLINE bool AddCell(TOutValue& row, NScheme::TTypeInfo type, const TCell& cell, TString& err)
{
auto &val = *row.add_items();
if (cell.IsNull()) {
val.set_null_flag_value(::google::protobuf::NULL_VALUE);
- return;
+ return true;
}
switch (type.GetTypeId()) {
- case NUdf::TDataType<bool>::Id:
- val.set_bool_value(cell.AsValue<bool>());
+ case NUdf::TDataType<bool>::Id: {
+ bool value;
+ if (!cell.ToValue(value, err))
+ return false;
+ val.set_bool_value(value);
break;
- case NUdf::TDataType<ui8>::Id:
- val.set_uint32_value(cell.AsValue<ui8>());
+ }
+ case NUdf::TDataType<ui8>::Id: {
+ ui8 value;
+ if (!cell.ToValue(value, err))
+ return false;
+ val.set_uint32_value(value);
break;
- case NUdf::TDataType<i8>::Id:
- val.set_int32_value(cell.AsValue<i8>());
+ }
+ case NUdf::TDataType<i8>::Id: {
+ i8 value;
+ if (!cell.ToValue(value, err))
+ return false;
+ val.set_int32_value(value);
break;
- case NUdf::TDataType<ui16>::Id:
- val.set_uint32_value(cell.AsValue<ui16>());
+ }
+ case NUdf::TDataType<ui16>::Id: {
+ ui16 value;
+ if (!cell.ToValue(value, err))
+ return false;
+ val.set_uint32_value(value);
break;
- case NUdf::TDataType<i16>::Id:
- val.set_int32_value(cell.AsValue<i16>());
+ }
+ case NUdf::TDataType<i16>::Id: {
+ i16 value;
+ if (!cell.ToValue(value, err))
+ return false;
+ val.set_int32_value(value);
break;
- case NUdf::TDataType<ui32>::Id:
- val.set_uint32_value(cell.AsValue<ui32>());
+ }
+ case NUdf::TDataType<ui32>::Id: {
+ ui32 value;
+ if (!cell.ToValue(value, err))
+ return false;
+ val.set_uint32_value(value);
break;
- case NUdf::TDataType<i32>::Id:
- val.set_int32_value(cell.AsValue<i32>());
+ }
+ case NUdf::TDataType<i32>::Id: {
+ i32 value;
+ if (!cell.ToValue(value, err))
+ return false;
+ val.set_int32_value(value);
break;
- case NUdf::TDataType<ui64>::Id:
- val.set_uint64_value(cell.AsValue<ui64>());
+ }
+ case NUdf::TDataType<ui64>::Id: {
+ ui64 value;
+ if (!cell.ToValue(value, err))
+ return false;
+ val.set_uint64_value(value);
break;
- case NUdf::TDataType<i64>::Id:
- val.set_int64_value(cell.AsValue<i64>());
+ }
+ case NUdf::TDataType<i64>::Id: {
+ i64 value;
+ if (!cell.ToValue(value, err))
+ return false;
+ val.set_int64_value(value);
break;
- case NUdf::TDataType<float>::Id:
- val.set_float_value(cell.AsValue<float>());
+ }
+ case NUdf::TDataType<float>::Id: {
+ float value;
+ if (!cell.ToValue(value, err))
+ return false;
+ val.set_float_value(value);
break;
- case NUdf::TDataType<double>::Id:
- val.set_double_value(cell.AsValue<double>());
+ }
+ case NUdf::TDataType<double>::Id: {
+ double value;
+ if (!cell.ToValue(value, err))
+ return false;
+ val.set_double_value(value);
break;
+ }
case NUdf::TDataType<NUdf::TJson>::Id:
- case NUdf::TDataType<NUdf::TUtf8>::Id:
+ case NUdf::TDataType<NUdf::TUtf8>::Id: {
val.set_text_value(cell.Data(), cell.Size());
break;
- case NUdf::TDataType<NUdf::TYson>::Id:
+ }
+ case NUdf::TDataType<NUdf::TYson>::Id: {
val.set_bytes_value(cell.Data(), cell.Size());
break;
- case NUdf::TDataType<NUdf::TDecimal>::Id:
- {
- Y_DEBUG_ABORT_UNLESS(cell.Size() == 16);
- struct TCellData {
- ui64 Low;
- ui64 High;
- };
- const auto data = cell.AsValue<TCellData>();
- val.set_low_128(data.Low);
- val.set_high_128(data.High);
- }
+ }
+ case NUdf::TDataType<NUdf::TDecimal>::Id: {
+ struct TCellData {
+ ui64 Low;
+ ui64 High;
+ } value;
+ if (!cell.ToValue(value, err))
+ return false;
+ val.set_low_128(value.Low);
+ val.set_high_128(value.High);
break;
- case NUdf::TDataType<NUdf::TDate>::Id:
- val.set_uint32_value(cell.AsValue<ui16>());
+ }
+ case NUdf::TDataType<NUdf::TDate>::Id: {
+ ui16 value;
+ if (!cell.ToValue(value, err))
+ return false;
+ val.set_uint32_value(value);
break;
- case NUdf::TDataType<NUdf::TDatetime>::Id:
- val.set_uint32_value(cell.AsValue<ui32>());
+ }
+ case NUdf::TDataType<NUdf::TDatetime>::Id: {
+ ui32 value;
+ if (!cell.ToValue(value, err))
+ return false;
+ val.set_uint32_value(value);
break;
- case NUdf::TDataType<NUdf::TTimestamp>::Id:
- val.set_uint64_value(cell.AsValue<ui64>());
+ }
+ case NUdf::TDataType<NUdf::TTimestamp>::Id: {
+ ui64 value;
+ if (!cell.ToValue(value, err))
+ return false;
+ val.set_uint64_value(value);
break;
- case NUdf::TDataType<NUdf::TInterval>::Id:
- val.set_int64_value(cell.AsValue<i64>());
+ }
+ case NUdf::TDataType<NUdf::TInterval>::Id: {
+ i64 value;
+ if (!cell.ToValue(value, err))
+ return false;
+ val.set_int64_value(value);
break;
+ }
case NUdf::TDataType<NUdf::TJsonDocument>::Id: {
const auto json = NBinaryJson::SerializeToJson(TStringBuf(cell.Data(), cell.Size()));
val.set_text_value(json);
@@ -102,7 +163,10 @@ Y_FORCE_INLINE void AddCell(TOutValue& row, NScheme::TTypeInfo type, const TCell
}
case NUdf::TDataType<NUdf::TDyNumber>::Id: {
const auto number = NDyNumber::DyNumberToString(TStringBuf(cell.Data(), cell.Size()));
- Y_ABORT_UNLESS(number.Defined(), "Invalid DyNumber binary representation");
+ if (!number.Defined()) {
+ err = "Invalid DyNumber binary representation";
+ return false;
+ }
val.set_text_value(*number);
break;
}
@@ -111,13 +175,17 @@ Y_FORCE_INLINE void AddCell(TOutValue& row, NScheme::TTypeInfo type, const TCell
TString(cell.Data(), cell.Size()),
NPg::PgTypeIdFromTypeDesc(type.GetTypeDesc())
);
- Y_ABORT_UNLESS(!result.Error, "Failed to add cell to Ydb::Value: %s", (*result.Error).c_str());
+ if (result.Error) {
+ err = Sprintf("Failed to add cell to Ydb::Value: %s", (*result.Error).c_str());
+ return false;
+ }
val.set_text_value(result.Str);
break;
}
default:
val.set_bytes_value(cell.Data(), cell.Size());
}
+ return true;
}
class TRowsToResult {
@@ -139,10 +207,12 @@ public:
ui64 GetMessageSize() const { return ResultString.size(); }
ui64 GetMessageRows() const { return CurrentMessageRows; }
- void PutRow(const NTable::TRowState& row) {
+ bool PutRow(const NTable::TRowState& row, TString& err) {
RowOffsets.push_back(static_cast<ui32>(ResultString.size()));
- DoPutRow(row);
+ if (!DoPutRow(row, err))
+ return false;
++CurrentMessageRows;
+ return true;
}
void Reserve(ui64 size)
@@ -174,7 +244,7 @@ protected:
CurrentMessageRows = 0;
}
private:
- virtual void DoPutRow(const NTable::TRowState &row) = 0;
+ virtual bool DoPutRow(const NTable::TRowState& row, TString& err) = 0;
virtual ui32 GetResultVersion() const = 0;
ui64 CurrentMessageRows;
@@ -198,17 +268,19 @@ public:
}
private:
- void DoPutRow(const NTable::TRowState &row) override
+ bool DoPutRow(const NTable::TRowState &row, TString& err) override
{
auto &protoRow = *OldResultSet.add_rows();
auto cells = *row;
for (size_t col = 0; col < cells.size(); ++col) {
- AddCell(protoRow, ColTypes[col], cells[col]);
+ if (!AddCell(protoRow, ColTypes[col], cells[col], err))
+ return false;
}
OldResultSet.SerializeToArcadiaStream(&ResultStream);
OldResultSet.Clear();
+ return true;
}
ui32 GetResultVersion() const override { return 0; }
@@ -240,17 +312,19 @@ public:
}
private:
- void DoPutRow(const NTable::TRowState& row) override
+ bool DoPutRow(const NTable::TRowState& row, TString& err) override
{
auto &protoRow = *YdbResultSet.add_rows();
auto cells = *row;
for (size_t col = 0; col < cells.size(); ++col) {
- AddCell(protoRow, ColTypes[col], cells[col]);
+ if (!AddCell(protoRow, ColTypes[col], cells[col], err))
+ return false;
}
YdbResultSet.SerializeToArcadiaStream(&ResultStream);
YdbResultSet.Clear();
+ return true;
}
ui32 GetResultVersion() const override { return NKikimrTxUserProxy::TReadTableTransaction::YDB_V1; }
@@ -584,7 +658,11 @@ private:
{
Y_DEBUG_ABORT_UNLESS(DebugCheckKeyInRange(key));
- Writer->PutRow(row);
+ if (!Writer->PutRow(row, Error)) {
+ LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Got scan fatal error: " << Error);
+ IsFatalError = true;
+ return EScan::Final;
+ }
return MaybeSendResponseMessage(false, key);
}
@@ -622,8 +700,7 @@ private:
Driver = nullptr;
Die(ctx);
-
- return new TReadTableProd(Error, SchemaChanged);
+ return new TReadTableProd(Error, IsFatalError, SchemaChanged);
}
private:
@@ -637,6 +714,7 @@ private:
ui64 MessageSizeLimit;
ui64 MessageRowsLimit;
TString Error;
+ bool IsFatalError = false;
THolder<TRowsToResult> Writer;
TUserTable::TCPtr TableInfo;
NKikimrTxDataShard::TReadTableTransaction Tx;
diff --git a/ydb/core/tx/datashard/read_table_scan.h b/ydb/core/tx/datashard/read_table_scan.h
index 0650c6ef67..f395459b31 100644
--- a/ydb/core/tx/datashard/read_table_scan.h
+++ b/ydb/core/tx/datashard/read_table_scan.h
@@ -8,12 +8,15 @@ namespace NDataShard {
class TReadTableProd : public IDestructable {
public:
- TReadTableProd(const TString &error, bool schemaChanged = false)
+ TReadTableProd(const TString& error, bool IsFatalError, bool schemaChanged)
: Error(error)
+ , IsFatalError(IsFatalError)
, SchemaChanged(schemaChanged)
- {}
+ {
+ }
TString Error;
+ bool IsFatalError;
bool SchemaChanged;
};
diff --git a/ydb/core/tx/datashard/read_table_scan_unit.cpp b/ydb/core/tx/datashard/read_table_scan_unit.cpp
index 62514580ba..936698e70d 100644
--- a/ydb/core/tx/datashard/read_table_scan_unit.cpp
+++ b/ydb/core/tx/datashard/read_table_scan_unit.cpp
@@ -162,11 +162,14 @@ EExecutionStatus TReadTableScanUnit::Execute(TOperation::TPtr op,
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD,
"ReadTable scan complete for " << *op << " at "
- << DataShard.TabletID() << " error: " << result->Error);
+ << DataShard.TabletID() << " error: " << result->Error << ", IsFatalError: " << result->IsFatalError);
tx->SetScanTask(0);
- if (result->SchemaChanged) {
+ if (result->IsFatalError) {
+ BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::ERROR)
+ ->SetExecutionError(NKikimrTxDataShard::TError::PROGRAM_ERROR, result->Error);
+ } else if (result->SchemaChanged) {
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::ERROR)
->AddError(NKikimrTxDataShard::TError::SCHEME_CHANGED, result->Error);
} else if (result->Error) {
diff --git a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp
index 0ccc2ab877..c8db1c6081 100644
--- a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp
+++ b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp
@@ -1373,7 +1373,7 @@ partitioning_settings {
TestGetExport(runtime, exportId, "/MyRoot");
}
- Y_UNIT_TEST(CorruptedBadDyNumber) {
+ Y_UNIT_TEST(CorruptedDyNumber) {
TTestBasicRuntime runtime;
TTestEnv env(runtime, TTestEnvOptions().DisableStatsBatching(true));
ui64 txId = 100;