aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgalaxycrab <UgnineSirdis@ydb.tech>2022-12-01 11:57:46 +0300
committergalaxycrab <UgnineSirdis@ydb.tech>2022-12-01 11:57:46 +0300
commit760ddc6f72eee80407863ad3dd82fc4dd6c3d18b (patch)
treeae86f04b4dfe97e418a0af510d55125e97779fb6
parentd0eadfe2f1ebc3ec61ae8d275058df50faf6adca (diff)
downloadydb-760ddc6f72eee80407863ad3dd82fc4dd6c3d18b.tar.gz
Add logging to write actor. Fix early checkpoint save. Fix loading incorrect checkpoint
Пофиксил вытаскивание из базы некорректного номера чекпоинта. В итоге мы восстанавливались из чекпоинта, который был не последним. При этом новую порцию данных мы читали из PQ, так как коммитили туда офсеты, а записывали в выходной топик актуальные данные по старым офсетам, что приводило к потере данных. На этом фэйлился тест `test_recovery_mz.py::TestRecovery::test_recovery`. Второй баг был в PQ write actor. Если в нём мы вызывали SendData сначала с одним сообщением, потом, до подтверждения его записи, вызывался метод SendData с чекпоинтом, то при условии пустого буффера вот тут https://a.yandex-team.ru/arcadia/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp?rev=rXXXXXX#L158 мы сразу сохраняли стейт с некорректным офсетом (не учитывая ещё не подтверждённый офсет). В итоге мы также затем при загрузке из такого чекпоинта могли терять данные на дедупликации, так как сообщение должно было идти до чекпоинта: ридер читал следующие сообщения, а записывали мы их по неправильному офсету. На этом валился тест `test_recovery.py::TestRecovery::test_recovery`. Также перед записью в PQ в тестах стал дожидаться zero checkpoint, так как это гарантирует нам, что ридеры не отсеят сообщения по времени своего старта.
-rw-r--r--ydb/core/yq/libs/checkpoint_storage/storage_proxy.cpp4
-rw-r--r--ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp2
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp59
3 files changed, 41 insertions, 24 deletions
diff --git a/ydb/core/yq/libs/checkpoint_storage/storage_proxy.cpp b/ydb/core/yq/libs/checkpoint_storage/storage_proxy.cpp
index f504653e6f2..1f3fb7d46c1 100644
--- a/ydb/core/yq/libs/checkpoint_storage/storage_proxy.cpp
+++ b/ydb/core/yq/libs/checkpoint_storage/storage_proxy.cpp
@@ -305,8 +305,8 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvAbortCheckpointRequest::TPtr
void TStorageProxy::Handle(TEvCheckpointStorage::TEvGetCheckpointsMetadataRequest::TPtr& ev) {
const auto* event = ev->Get();
- LOG_STREAMS_STORAGE_SERVICE_DEBUG("[" << event->GraphId << "] Got TEvGetCheckpointsMetadataRequest")
- CheckpointStorage->GetCheckpoints(event->GraphId, event->Statuses, event->Limit, event->LoadGraphDescription)
+ LOG_STREAMS_STORAGE_SERVICE_DEBUG("[" << event->GraphId << "] Got TEvGetCheckpointsMetadataRequest");
+ CheckpointStorage->GetCheckpoints(event->GraphId, event->Statuses, event->Limit, event->LoadGraphDescription)
.Apply([graphId = event->GraphId,
cookie = ev->Cookie,
sender = ev->Sender,
diff --git a/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp b/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp
index a3308b9206b..2017ead2447 100644
--- a/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp
+++ b/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp
@@ -372,7 +372,7 @@ TFuture<TDataQueryResult> SelectGraphCheckpoints(const TGenerationContextPtr& co
{join}
WHERE metadata.graph_id = $graph_id
{statuses_condition}
- ORDER BY coordinator_generation, seq_no DESC
+ ORDER BY coordinator_generation DESC, seq_no DESC
{limit_condition};
)sql",
"table_path_prefix"_a = context->TablePathPrefix,
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp
index 1b853fc8769..be8916379ad 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp
@@ -37,22 +37,32 @@ namespace NKikimrServices {
constexpr ui32 KQP_COMPUTE = 535;
};
-#define SINK_LOG_T(s) \
- LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
-#define SINK_LOG_D(s) \
- LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
-#define SINK_LOG_I(s) \
- LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
-#define SINK_LOG_W(s) \
- LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
-#define SINK_LOG_N(s) \
- LOG_NOTICE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
-#define SINK_LOG_E(s) \
- LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
-#define SINK_LOG_C(s) \
- LOG_CRIT_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
-#define SINK_LOG(prio, s) \
- LOG_LOG_S(*NActors::TlsActivationContext, prio, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
+#define LOG_T(s) \
+ LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, s)
+#define LOG_D(s) \
+ LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, s)
+#define LOG_I(s) \
+ LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, s)
+#define LOG_W(s) \
+ LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, s)
+#define LOG_N(s) \
+ LOG_NOTICE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, s)
+#define LOG_E(s) \
+ LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, s)
+#define LOG_C(s) \
+ LOG_CRIT_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, s)
+#define LOG_PRIO(prio, s) \
+ LOG_LOG_S(*NActors::TlsActivationContext, prio, NKikimrServices::KQP_COMPUTE, s)
+
+
+#define SINK_LOG_T(s) LOG_T(LogPrefix << s)
+#define SINK_LOG_D(s) LOG_D(LogPrefix << s)
+#define SINK_LOG_I(s) LOG_I(LogPrefix << s)
+#define SINK_LOG_W(s) LOG_W(LogPrefix << s)
+#define SINK_LOG_N(s) LOG_N(LogPrefix << s)
+#define SINK_LOG_E(s) LOG_E(LogPrefix << s)
+#define SINK_LOG_C(s) LOG_C(LogPrefix << s)
+#define SINK_LOG_PRIO(prio, s) LOG_PRIO(prio, LogPrefix << s)
namespace NYql::NDq {
@@ -117,6 +127,9 @@ public:
const TMaybe<NDqProto::TCheckpoint>& checkpoint,
bool finished) override
{
+ SINK_LOG_T("SendData. Batch: " << batch.size()
+ << ". Checkpoint: " << checkpoint.Defined()
+ << ". Finished: " << finished);
Y_UNUSED(dataSize);
if (finished) {
@@ -155,8 +168,8 @@ public:
}
if (checkpoint) {
- if (Buffer.empty()) {
- Callbacks->OnAsyncOutputStateSaved(BuildState(), OutputIndex, *checkpoint);
+ if (Buffer.empty() && WaitingAcks.empty()) {
+ Callbacks->OnAsyncOutputStateSaved(BuildState(*checkpoint), OutputIndex, *checkpoint);
} else {
DeferredCheckpoints.emplace(NextSeqNo + Buffer.size() - 1, *checkpoint);
}
@@ -180,6 +193,7 @@ public:
if (data.GetVersion() == StateVersion) { // Current version
NPq::NProto::TDqPqTopicSinkState stateProto;
YQL_ENSURE(stateProto.ParseFromString(data.GetBlob()), "Serialized state is corrupted");
+ SINK_LOG_D("Load state: " << stateProto);
SourceId = stateProto.GetSourceId();
ConfirmedSeqNo = stateProto.GetConfirmedSeqNo();
NextSeqNo = ConfirmedSeqNo + 1;
@@ -289,7 +303,7 @@ private:
return !events.empty();
}
- NDqProto::TSinkState BuildState() {
+ NDqProto::TSinkState BuildState(const NDqProto::TCheckpoint& checkpoint) {
NPq::NProto::TDqPqTopicSinkState stateProto;
stateProto.SetSourceId(GetSourceId());
stateProto.SetConfirmedSeqNo(ConfirmedSeqNo);
@@ -300,10 +314,12 @@ private:
auto* data = sinkState.MutableData()->MutableStateData();
data->SetVersion(StateVersion);
data->SetBlob(serializedState);
+ SINK_LOG_T("Save checkpoint " << checkpoint << " state: " << stateProto << ". Sink state: " << sinkState);
return sinkState;
}
void WriteNextMessage(NYdb::NPersQueue::TContinuationToken&& token) {
+ SINK_LOG_T("Write data: \"" << Buffer.front() << "\" with seq no " << NextSeqNo);
WriteSession->Write(std::move(token), Buffer.front(), NextSeqNo++);
WaitingAcks.push(GetItemSize(Buffer.front()));
Buffer.pop();
@@ -331,7 +347,7 @@ private:
for (auto it = ev.Acks.begin(); it != ev.Acks.end(); ++it) {
//Y_VERIFY(it == ev.Acks.begin() || it->SeqNo == std::prev(it)->SeqNo + 1);
-
+ LOG_T(Self.LogPrefix << "Ack seq no " << it->SeqNo);
if (it->State == NYdb::NPersQueue::TWriteSessionEvent::TWriteAck::EEventState::EES_DISCARDED) {
TIssues issues;
issues.AddIssue(TStringBuilder() << "Message with seqNo " << it->SeqNo << " was discarded");
@@ -343,7 +359,8 @@ private:
if (!Self.DeferredCheckpoints.empty() && std::get<0>(Self.DeferredCheckpoints.front()) == it->SeqNo) {
Self.ConfirmedSeqNo = it->SeqNo;
- Self.Callbacks->OnAsyncOutputStateSaved(Self.BuildState(), Self.OutputIndex, std::get<1>(Self.DeferredCheckpoints.front()));
+ const auto& checkpoint = std::get<1>(Self.DeferredCheckpoints.front());
+ Self.Callbacks->OnAsyncOutputStateSaved(Self.BuildState(checkpoint), Self.OutputIndex, checkpoint);
Self.DeferredCheckpoints.pop();
}
}