diff options
author | Evgeniy Ivanov <[email protected]> | 2022-06-23 13:54:37 +0300 |
---|---|---|
committer | Evgeniy Ivanov <[email protected]> | 2022-06-23 13:54:37 +0300 |
commit | 8a5992c0672644d86d714b38434563201eab91a9 (patch) | |
tree | 1a2cd77840c03ea480804a28d4347c08bf6a280e | |
parent | 63a7206d158d3b276bb95b79ea2db092d408d1c0 (diff) |
PR from branch users/eivanov89/KIKIMR-13003-iterator-read-v1-fixes
KIKIMR-13003: allow ds iterator on followers
KIKIMR-13003: update protocol description
ref:d8225c8a6154f6f736ecbc8c036c9f610f20265c
-rw-r--r-- | ydb/core/protos/tx_datashard.proto | 55 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__read_iterator.cpp | 39 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_read_iterator.cpp | 37 |
4 files changed, 96 insertions, 39 deletions
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index 598d68a8fda..bf2b5400927 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -1461,26 +1461,47 @@ message TEvGetCompactTableStatsResult { // // The TEvRead message starts a new read request with given queries // and starts streaming a single resultset in the requested format. -// The TEvRead contains number of bytes/rows (quota) the client can handle, -// the server is allowed to reply with multiple TEvReadResult messages -// within the quota. To update quota user uses TEvReadAck message. +// The TEvRead contains number of rows (hard limit) and bytes (soft limit) +// the client can handle, the server is allowed to reply with multiple TEvReadResult +// messages within the quota. To update quota user uses TEvReadAck message. // // The protocol works as follows: // 1. Optionally user obtains LockTxId from TxProxy. +// // 2. Optionally user obtains snapshot version. +// // 3. User starts reading by sending TEvRead to the shard. +// // 4. Shard replies with TEvReadResult, which contains: -// - ReadId which is the same as in TEvRead +// - ReadId which is the same as in TEvRead. Shard gives no guarantee +// that the ReadId will be valid any time. // - Snapshot version, which is useful when it wasn't specified in TEvRead. -// Shard gives no guarantee that the ReadId will be valid any time. // - SeqNo that should be used by user in TEvReadAck // - ContinuationToken, which user can use to restart the read. -// 5. User reads until the end using TEvReadAck to update the quota. -// 6. User stops reading by TEvReadCancel. +// +// 5. On each TEvReadResult except the last one user sends TEvReadAck to update quota and continue reading. +// * If user has already received multiple TEvReadResult messages, it is allowed to send +// single TEvReadAck with SeqNo from the last result. +// * If user received final TEvReadResult or TEvReadResult with error he should not send any reply. +// Note that server will stop sending TEvReadResult messages only if last sent result contains either +// Finished or LimitReached field set. Otherwise until there is no disconnect, user can rely that +// he will receive more TEvReadResult messages. Though to improve latency it's a good practice +// to send TEvReadAck before exhausting quota. +// +// 6. User either receives TEvReadResult with Finished field set or TEvReadResult with error +// or stops reading hisself using TEvReadCancel. // // TEvReadResult is valid only if its Status is equal to Ydb::StatusIds::SUCCESS. // In case of any other status code iterator has been invalidated and further // usage of its ReadId will result in either non-success status code. +// +// Shard is allowed to send TEvReadResult without actual results, but with LimitReached field set. +// E.g. it happens when initial TEvRead has too small quota to read at least single row. +// Receiving TEvReadResult with LimitReached set means that shard will not read more rows until +// user sends ACK with extended quota. +// +// User must detect disconnect and after it happened ignore possible results from the shard. +// Instead new read must be started using ContinuationToken from last successful TEvReadResult. message TEvRead { // User must always provide unique ReadId // Note that shard distinguishes equal ReadId's @@ -1511,6 +1532,12 @@ message TEvRead { optional EScanDataFormat ResultFormat = 6; // Limits the total number of rows or bytes client currently can handle + // MaxRows is hard limit: shard must never violate it + // MaxBytes is soft limit: shard stops only after crossing the limit, i.e. + // if n rows are within MaxBytes and n+1 is not - shard returns n+1 rows. + // + // Note that because of this result will always contain at least one row + // independent on quota. optional uint32 MaxRows = 7; optional uint32 MaxBytes = 8; @@ -1547,12 +1574,11 @@ message TReadContinuationToken { optional bytes LastProcessedKey = 2; } -// Every TEvRead and TEvReadNext produces a result +// Every TEvRead and TEvReadAck produces a result // The result may be partial, in which case client may continue streaming more -// results using TEvReadNext. +// results using TEvReadAck. // -// Note the condition that everything has been read: FirstUnprocessedQuery equals to -// the size of Queries from TEvRead and LastProcessedKey is not set. +// Note the condition that everything has been read: Finished field is set. message TEvReadResult { message TArrowBatch { optional bytes Schema = 1; @@ -1567,7 +1593,7 @@ message TEvReadResult { // same as ReadId from the TEvRead optional uint64 ReadId = 1; - // Specifies SeqNo for the next TEvReadNext call + // Specifies SeqNo for the next TEvReadAck call optional uint64 SeqNo = 2; optional TStatus Status = 3; @@ -1594,9 +1620,14 @@ message TEvReadResult { } } +// After handling ReadResult with specified SeqNo user is ready to handle at most +// MaxRows and MaxBytes. But keep in mind that until shard handles this message +// it can send data based on previous quota. message TEvReadAck { optional uint64 ReadId = 1; optional uint64 SeqNo = 2; + + // see comment for same fields in TEvRead optional uint32 MaxRows = 3; optional uint32 MaxBytes = 4; }; diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index d80715f5695..a87f5179e89 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -713,7 +713,28 @@ private: state.Reverse = record.GetReverse(); + // note that we must call SyncSchemeOnFollower before we do any kind of checks + if (Self->IsFollower()) { + NKikimrTxDataShard::TError::EKind status = NKikimrTxDataShard::TError::OK; + TString errMessage; + + if (!Self->SyncSchemeOnFollower(txc, ctx, status, errMessage)) { + finished = false; + return; + } + + if (status != NKikimrTxDataShard::TError::OK) { + SetStatusError( + Result->Record, + Ydb::StatusIds::INTERNAL_ERROR, + "Follower not ready"); + finished = true; + return; + } + } + if (state.PathId.OwnerId != Self->TabletID()) { + // owner is schemeshard, read user table if (state.PathId.OwnerId != Self->GetPathOwnerId()) { SetStatusError( Result->Record, @@ -788,6 +809,7 @@ private: userTableInfo->Stats.AccessTime = TAppData::TimeProvider->Now(); } else { + // DS is owner, read system table if (state.PathId.LocalPathId >= TDataShard::Schema::MinLocalTid) { SetStatusError( Result->Record, @@ -842,23 +864,6 @@ private: } if (Self->IsFollower()) { - NKikimrTxDataShard::TError::EKind status = NKikimrTxDataShard::TError::OK; - TString errMessage; - - if (!Self->SyncSchemeOnFollower(txc, ctx, status, errMessage)) { - finished = false; - return; - } - - if (status != NKikimrTxDataShard::TError::OK) { - SetStatusError( - Result->Record, - Ydb::StatusIds::INTERNAL_ERROR, - "Follower not ready"); - finished = true; - return; - } - if (!state.ReadVersion.IsMax()) { // check that follower has this version NIceDb::TNiceDb db(txc.DB); diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index af75381e1c4..4b5586b8319 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -2407,6 +2407,10 @@ protected: HFuncTraced(TEvDataShard::TEvReadColumnsRequest, Handle); HFuncTraced(TEvTabletPipe::TEvServerConnected, Handle); HFuncTraced(TEvTabletPipe::TEvServerDisconnected, Handle); + HFuncTraced(TEvDataShard::TEvRead, Handle); + HFuncTraced(TEvDataShard::TEvReadContinue, Handle); + HFuncTraced(TEvDataShard::TEvReadAck, Handle); + HFuncTraced(TEvDataShard::TEvReadCancel, Handle); default: if (!HandleDefaultEvents(ev, ctx)) { LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD, "TDataShard::StateWorkAsFollower unhandled event type: " << ev->GetTypeRewrite() diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp index f079cb75be1..316d4f1d6a3 100644 --- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp @@ -24,7 +24,8 @@ using TCellVec = std::vector<TCell>; void CreateTable(Tests::TServer::TPtr server, TActorId sender, const TString &root, - const TString &name) + const TString &name, + bool withFollower = false) { TVector<TShardedTableOptions::TColumn> columns = { {"key1", "Uint32", true, false}, @@ -37,6 +38,9 @@ void CreateTable(Tests::TServer::TPtr server, .Shards(1) .Columns(columns); + if (withFollower) + opts.Followers(1); + CreateShardedTable(server, sender, root, name, opts); } @@ -275,7 +279,8 @@ struct TTableInfo { }; struct TTestHelper { - TTestHelper() { + TTestHelper(bool withFollower = false) { + WithFollower = withFollower; TPortManager pm; TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") @@ -294,7 +299,7 @@ struct TTestHelper { auto& table1 = Tables["table-1"]; table1.Name = "table-1"; { - CreateTable(Server, Sender, "/Root", "table-1"); + CreateTable(Server, Sender, "/Root", "table-1", WithFollower); ExecSQL(Server, Sender, R"( UPSERT INTO [/Root/table-1] (key1, key2, key3, value) @@ -316,7 +321,7 @@ struct TTestHelper { table1.OwnerId = ownerId; table1.UserTable = tables["table-1"]; - table1.ClientId = runtime.ConnectToPipe(table1.TabletId, Sender, 0, GetPipeConfigWithRetries()); + table1.ClientId = runtime.ConnectToPipe(table1.TabletId, Sender, 0, GetTestPipeConfig()); } auto& table2 = Tables["movies"]; @@ -339,7 +344,7 @@ struct TTestHelper { table2.OwnerId = ownerId; table2.UserTable = tables["movies"]; - table2.ClientId = runtime.ConnectToPipe(table2.TabletId, Sender, 0, GetPipeConfigWithRetries()); + table2.ClientId = runtime.ConnectToPipe(table2.TabletId, Sender, 0, GetTestPipeConfig()); } } @@ -426,7 +431,7 @@ struct TTestHelper { Sender, request, 0, - GetPipeConfigWithRetries(), + GetTestPipeConfig(), table.ClientId); return WaitReadResult(); @@ -450,7 +455,7 @@ struct TTestHelper { table.TabletId, Sender, request, 0, - GetPipeConfigWithRetries(), + GetTestPipeConfig(), table.ClientId); } @@ -465,19 +470,27 @@ struct TTestHelper { Sender, request, 0, - GetPipeConfigWithRetries(), + GetTestPipeConfig(), table.ClientId); } + NTabletPipe::TClientConfig GetTestPipeConfig() { + auto config = GetPipeConfigWithRetries(); + if (WithFollower) + config.ForceFollower = true; + return config; + } + public: + bool WithFollower = false; Tests::TServer::TPtr Server; TActorId Sender; THashMap<TString, TTableInfo> Tables; }; -void TestReadKey(NKikimrTxDataShard::EScanDataFormat format) { - TTestHelper helper; +void TestReadKey(NKikimrTxDataShard::EScanDataFormat format, bool withFollower = false) { + TTestHelper helper(withFollower); for (ui32 k: {1, 3, 5}) { auto request = helper.GetBaseReadRequest("table-1", 1, format); @@ -1236,6 +1249,10 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { UNIT_ASSERT(readResult3); UNIT_ASSERT_VALUES_EQUAL(readResult3->Record.GetStatus().GetCode(), Ydb::StatusIds::BAD_SESSION); } + + Y_UNIT_TEST(ShouldReadFromFollower) { + TestReadKey(NKikimrTxDataShard::CELLVEC, true); + } }; Y_UNIT_TEST_SUITE(DataShardReadIteratorSysTables) { |