summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEvgeniy Ivanov <[email protected]>2022-06-23 13:54:37 +0300
committerEvgeniy Ivanov <[email protected]>2022-06-23 13:54:37 +0300
commit8a5992c0672644d86d714b38434563201eab91a9 (patch)
tree1a2cd77840c03ea480804a28d4347c08bf6a280e
parent63a7206d158d3b276bb95b79ea2db092d408d1c0 (diff)
PR from branch users/eivanov89/KIKIMR-13003-iterator-read-v1-fixes
KIKIMR-13003: allow ds iterator on followers KIKIMR-13003: update protocol description ref:d8225c8a6154f6f736ecbc8c036c9f610f20265c
-rw-r--r--ydb/core/protos/tx_datashard.proto55
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp39
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h4
-rw-r--r--ydb/core/tx/datashard/datashard_ut_read_iterator.cpp37
4 files changed, 96 insertions, 39 deletions
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index 598d68a8fda..bf2b5400927 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -1461,26 +1461,47 @@ message TEvGetCompactTableStatsResult {
//
// The TEvRead message starts a new read request with given queries
// and starts streaming a single resultset in the requested format.
-// The TEvRead contains number of bytes/rows (quota) the client can handle,
-// the server is allowed to reply with multiple TEvReadResult messages
-// within the quota. To update quota user uses TEvReadAck message.
+// The TEvRead contains number of rows (hard limit) and bytes (soft limit)
+// the client can handle, the server is allowed to reply with multiple TEvReadResult
+// messages within the quota. To update quota user uses TEvReadAck message.
//
// The protocol works as follows:
// 1. Optionally user obtains LockTxId from TxProxy.
+//
// 2. Optionally user obtains snapshot version.
+//
// 3. User starts reading by sending TEvRead to the shard.
+//
// 4. Shard replies with TEvReadResult, which contains:
-// - ReadId which is the same as in TEvRead
+// - ReadId which is the same as in TEvRead. Shard gives no guarantee
+// that the ReadId will be valid any time.
// - Snapshot version, which is useful when it wasn't specified in TEvRead.
-// Shard gives no guarantee that the ReadId will be valid any time.
// - SeqNo that should be used by user in TEvReadAck
// - ContinuationToken, which user can use to restart the read.
-// 5. User reads until the end using TEvReadAck to update the quota.
-// 6. User stops reading by TEvReadCancel.
+//
+// 5. On each TEvReadResult except the last one user sends TEvReadAck to update quota and continue reading.
+// * If user has already received multiple TEvReadResult messages, it is allowed to send
+// single TEvReadAck with SeqNo from the last result.
+// * If user received final TEvReadResult or TEvReadResult with error he should not send any reply.
+// Note that server will stop sending TEvReadResult messages only if last sent result contains either
+// Finished or LimitReached field set. Otherwise until there is no disconnect, user can rely that
+// he will receive more TEvReadResult messages. Though to improve latency it's a good practice
+// to send TEvReadAck before exhausting quota.
+//
+// 6. User either receives TEvReadResult with Finished field set or TEvReadResult with error
+// or stops reading hisself using TEvReadCancel.
//
// TEvReadResult is valid only if its Status is equal to Ydb::StatusIds::SUCCESS.
// In case of any other status code iterator has been invalidated and further
// usage of its ReadId will result in either non-success status code.
+//
+// Shard is allowed to send TEvReadResult without actual results, but with LimitReached field set.
+// E.g. it happens when initial TEvRead has too small quota to read at least single row.
+// Receiving TEvReadResult with LimitReached set means that shard will not read more rows until
+// user sends ACK with extended quota.
+//
+// User must detect disconnect and after it happened ignore possible results from the shard.
+// Instead new read must be started using ContinuationToken from last successful TEvReadResult.
message TEvRead {
// User must always provide unique ReadId
// Note that shard distinguishes equal ReadId's
@@ -1511,6 +1532,12 @@ message TEvRead {
optional EScanDataFormat ResultFormat = 6;
// Limits the total number of rows or bytes client currently can handle
+ // MaxRows is hard limit: shard must never violate it
+ // MaxBytes is soft limit: shard stops only after crossing the limit, i.e.
+ // if n rows are within MaxBytes and n+1 is not - shard returns n+1 rows.
+ //
+ // Note that because of this result will always contain at least one row
+ // independent on quota.
optional uint32 MaxRows = 7;
optional uint32 MaxBytes = 8;
@@ -1547,12 +1574,11 @@ message TReadContinuationToken {
optional bytes LastProcessedKey = 2;
}
-// Every TEvRead and TEvReadNext produces a result
+// Every TEvRead and TEvReadAck produces a result
// The result may be partial, in which case client may continue streaming more
-// results using TEvReadNext.
+// results using TEvReadAck.
//
-// Note the condition that everything has been read: FirstUnprocessedQuery equals to
-// the size of Queries from TEvRead and LastProcessedKey is not set.
+// Note the condition that everything has been read: Finished field is set.
message TEvReadResult {
message TArrowBatch {
optional bytes Schema = 1;
@@ -1567,7 +1593,7 @@ message TEvReadResult {
// same as ReadId from the TEvRead
optional uint64 ReadId = 1;
- // Specifies SeqNo for the next TEvReadNext call
+ // Specifies SeqNo for the next TEvReadAck call
optional uint64 SeqNo = 2;
optional TStatus Status = 3;
@@ -1594,9 +1620,14 @@ message TEvReadResult {
}
}
+// After handling ReadResult with specified SeqNo user is ready to handle at most
+// MaxRows and MaxBytes. But keep in mind that until shard handles this message
+// it can send data based on previous quota.
message TEvReadAck {
optional uint64 ReadId = 1;
optional uint64 SeqNo = 2;
+
+ // see comment for same fields in TEvRead
optional uint32 MaxRows = 3;
optional uint32 MaxBytes = 4;
};
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index d80715f5695..a87f5179e89 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -713,7 +713,28 @@ private:
state.Reverse = record.GetReverse();
+ // note that we must call SyncSchemeOnFollower before we do any kind of checks
+ if (Self->IsFollower()) {
+ NKikimrTxDataShard::TError::EKind status = NKikimrTxDataShard::TError::OK;
+ TString errMessage;
+
+ if (!Self->SyncSchemeOnFollower(txc, ctx, status, errMessage)) {
+ finished = false;
+ return;
+ }
+
+ if (status != NKikimrTxDataShard::TError::OK) {
+ SetStatusError(
+ Result->Record,
+ Ydb::StatusIds::INTERNAL_ERROR,
+ "Follower not ready");
+ finished = true;
+ return;
+ }
+ }
+
if (state.PathId.OwnerId != Self->TabletID()) {
+ // owner is schemeshard, read user table
if (state.PathId.OwnerId != Self->GetPathOwnerId()) {
SetStatusError(
Result->Record,
@@ -788,6 +809,7 @@ private:
userTableInfo->Stats.AccessTime = TAppData::TimeProvider->Now();
} else {
+ // DS is owner, read system table
if (state.PathId.LocalPathId >= TDataShard::Schema::MinLocalTid) {
SetStatusError(
Result->Record,
@@ -842,23 +864,6 @@ private:
}
if (Self->IsFollower()) {
- NKikimrTxDataShard::TError::EKind status = NKikimrTxDataShard::TError::OK;
- TString errMessage;
-
- if (!Self->SyncSchemeOnFollower(txc, ctx, status, errMessage)) {
- finished = false;
- return;
- }
-
- if (status != NKikimrTxDataShard::TError::OK) {
- SetStatusError(
- Result->Record,
- Ydb::StatusIds::INTERNAL_ERROR,
- "Follower not ready");
- finished = true;
- return;
- }
-
if (!state.ReadVersion.IsMax()) {
// check that follower has this version
NIceDb::TNiceDb db(txc.DB);
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index af75381e1c4..4b5586b8319 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -2407,6 +2407,10 @@ protected:
HFuncTraced(TEvDataShard::TEvReadColumnsRequest, Handle);
HFuncTraced(TEvTabletPipe::TEvServerConnected, Handle);
HFuncTraced(TEvTabletPipe::TEvServerDisconnected, Handle);
+ HFuncTraced(TEvDataShard::TEvRead, Handle);
+ HFuncTraced(TEvDataShard::TEvReadContinue, Handle);
+ HFuncTraced(TEvDataShard::TEvReadAck, Handle);
+ HFuncTraced(TEvDataShard::TEvReadCancel, Handle);
default:
if (!HandleDefaultEvents(ev, ctx)) {
LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD, "TDataShard::StateWorkAsFollower unhandled event type: " << ev->GetTypeRewrite()
diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
index f079cb75be1..316d4f1d6a3 100644
--- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
@@ -24,7 +24,8 @@ using TCellVec = std::vector<TCell>;
void CreateTable(Tests::TServer::TPtr server,
TActorId sender,
const TString &root,
- const TString &name)
+ const TString &name,
+ bool withFollower = false)
{
TVector<TShardedTableOptions::TColumn> columns = {
{"key1", "Uint32", true, false},
@@ -37,6 +38,9 @@ void CreateTable(Tests::TServer::TPtr server,
.Shards(1)
.Columns(columns);
+ if (withFollower)
+ opts.Followers(1);
+
CreateShardedTable(server, sender, root, name, opts);
}
@@ -275,7 +279,8 @@ struct TTableInfo {
};
struct TTestHelper {
- TTestHelper() {
+ TTestHelper(bool withFollower = false) {
+ WithFollower = withFollower;
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
@@ -294,7 +299,7 @@ struct TTestHelper {
auto& table1 = Tables["table-1"];
table1.Name = "table-1";
{
- CreateTable(Server, Sender, "/Root", "table-1");
+ CreateTable(Server, Sender, "/Root", "table-1", WithFollower);
ExecSQL(Server, Sender, R"(
UPSERT INTO [/Root/table-1]
(key1, key2, key3, value)
@@ -316,7 +321,7 @@ struct TTestHelper {
table1.OwnerId = ownerId;
table1.UserTable = tables["table-1"];
- table1.ClientId = runtime.ConnectToPipe(table1.TabletId, Sender, 0, GetPipeConfigWithRetries());
+ table1.ClientId = runtime.ConnectToPipe(table1.TabletId, Sender, 0, GetTestPipeConfig());
}
auto& table2 = Tables["movies"];
@@ -339,7 +344,7 @@ struct TTestHelper {
table2.OwnerId = ownerId;
table2.UserTable = tables["movies"];
- table2.ClientId = runtime.ConnectToPipe(table2.TabletId, Sender, 0, GetPipeConfigWithRetries());
+ table2.ClientId = runtime.ConnectToPipe(table2.TabletId, Sender, 0, GetTestPipeConfig());
}
}
@@ -426,7 +431,7 @@ struct TTestHelper {
Sender,
request,
0,
- GetPipeConfigWithRetries(),
+ GetTestPipeConfig(),
table.ClientId);
return WaitReadResult();
@@ -450,7 +455,7 @@ struct TTestHelper {
table.TabletId,
Sender, request,
0,
- GetPipeConfigWithRetries(),
+ GetTestPipeConfig(),
table.ClientId);
}
@@ -465,19 +470,27 @@ struct TTestHelper {
Sender,
request,
0,
- GetPipeConfigWithRetries(),
+ GetTestPipeConfig(),
table.ClientId);
}
+ NTabletPipe::TClientConfig GetTestPipeConfig() {
+ auto config = GetPipeConfigWithRetries();
+ if (WithFollower)
+ config.ForceFollower = true;
+ return config;
+ }
+
public:
+ bool WithFollower = false;
Tests::TServer::TPtr Server;
TActorId Sender;
THashMap<TString, TTableInfo> Tables;
};
-void TestReadKey(NKikimrTxDataShard::EScanDataFormat format) {
- TTestHelper helper;
+void TestReadKey(NKikimrTxDataShard::EScanDataFormat format, bool withFollower = false) {
+ TTestHelper helper(withFollower);
for (ui32 k: {1, 3, 5}) {
auto request = helper.GetBaseReadRequest("table-1", 1, format);
@@ -1236,6 +1249,10 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
UNIT_ASSERT(readResult3);
UNIT_ASSERT_VALUES_EQUAL(readResult3->Record.GetStatus().GetCode(), Ydb::StatusIds::BAD_SESSION);
}
+
+ Y_UNIT_TEST(ShouldReadFromFollower) {
+ TestReadKey(NKikimrTxDataShard::CELLVEC, true);
+ }
};
Y_UNIT_TEST_SUITE(DataShardReadIteratorSysTables) {