diff options
author | galaxycrab <UgnineSirdis@ydb.tech> | 2022-12-01 11:57:46 +0300 |
---|---|---|
committer | galaxycrab <UgnineSirdis@ydb.tech> | 2022-12-01 11:57:46 +0300 |
commit | 760ddc6f72eee80407863ad3dd82fc4dd6c3d18b (patch) | |
tree | ae86f04b4dfe97e418a0af510d55125e97779fb6 | |
parent | d0eadfe2f1ebc3ec61ae8d275058df50faf6adca (diff) | |
download | ydb-760ddc6f72eee80407863ad3dd82fc4dd6c3d18b.tar.gz |
Add logging to write actor. Fix early checkpoint save. Fix loading incorrect checkpoint
Пофиксил вытаскивание из базы некорректного номера чекпоинта. В итоге мы восстанавливались из чекпоинта, который был не последним. При этом новую порцию данных мы читали из PQ, так как коммитили туда офсеты, а записывали в выходной топик актуальные данные по старым офсетам, что приводило к потере данных. На этом фэйлился тест `test_recovery_mz.py::TestRecovery::test_recovery`.
Второй баг был в PQ write actor. Если в нём мы вызывали SendData сначала с одним сообщением, потом, до подтверждения его записи, вызывался метод SendData с чекпоинтом, то при условии пустого буффера вот тут https://a.yandex-team.ru/arcadia/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp?rev=rXXXXXX#L158 мы сразу сохраняли стейт с некорректным офсетом (не учитывая ещё не подтверждённый офсет). В итоге мы также затем при загрузке из такого чекпоинта могли терять данные на дедупликации, так как сообщение должно было идти до чекпоинта: ридер читал следующие сообщения, а записывали мы их по неправильному офсету. На этом валился тест `test_recovery.py::TestRecovery::test_recovery`.
Также перед записью в PQ в тестах стал дожидаться zero checkpoint, так как это гарантирует нам, что ридеры не отсеят сообщения по времени своего старта.
3 files changed, 41 insertions, 24 deletions
diff --git a/ydb/core/yq/libs/checkpoint_storage/storage_proxy.cpp b/ydb/core/yq/libs/checkpoint_storage/storage_proxy.cpp index f504653e6f2..1f3fb7d46c1 100644 --- a/ydb/core/yq/libs/checkpoint_storage/storage_proxy.cpp +++ b/ydb/core/yq/libs/checkpoint_storage/storage_proxy.cpp @@ -305,8 +305,8 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvAbortCheckpointRequest::TPtr void TStorageProxy::Handle(TEvCheckpointStorage::TEvGetCheckpointsMetadataRequest::TPtr& ev) { const auto* event = ev->Get(); - LOG_STREAMS_STORAGE_SERVICE_DEBUG("[" << event->GraphId << "] Got TEvGetCheckpointsMetadataRequest") - CheckpointStorage->GetCheckpoints(event->GraphId, event->Statuses, event->Limit, event->LoadGraphDescription) + LOG_STREAMS_STORAGE_SERVICE_DEBUG("[" << event->GraphId << "] Got TEvGetCheckpointsMetadataRequest"); + CheckpointStorage->GetCheckpoints(event->GraphId, event->Statuses, event->Limit, event->LoadGraphDescription) .Apply([graphId = event->GraphId, cookie = ev->Cookie, sender = ev->Sender, diff --git a/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp b/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp index a3308b9206b..2017ead2447 100644 --- a/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp +++ b/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp @@ -372,7 +372,7 @@ TFuture<TDataQueryResult> SelectGraphCheckpoints(const TGenerationContextPtr& co {join} WHERE metadata.graph_id = $graph_id {statuses_condition} - ORDER BY coordinator_generation, seq_no DESC + ORDER BY coordinator_generation DESC, seq_no DESC {limit_condition}; )sql", "table_path_prefix"_a = context->TablePathPrefix, diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp index 1b853fc8769..be8916379ad 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp @@ -37,22 +37,32 @@ namespace NKikimrServices { constexpr ui32 KQP_COMPUTE = 535; }; -#define SINK_LOG_T(s) \ - LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) -#define SINK_LOG_D(s) \ - LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) -#define SINK_LOG_I(s) \ - LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) -#define SINK_LOG_W(s) \ - LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) -#define SINK_LOG_N(s) \ - LOG_NOTICE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) -#define SINK_LOG_E(s) \ - LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) -#define SINK_LOG_C(s) \ - LOG_CRIT_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) -#define SINK_LOG(prio, s) \ - LOG_LOG_S(*NActors::TlsActivationContext, prio, NKikimrServices::KQP_COMPUTE, LogPrefix << s) +#define LOG_T(s) \ + LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, s) +#define LOG_D(s) \ + LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, s) +#define LOG_I(s) \ + LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, s) +#define LOG_W(s) \ + LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, s) +#define LOG_N(s) \ + LOG_NOTICE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, s) +#define LOG_E(s) \ + LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, s) +#define LOG_C(s) \ + LOG_CRIT_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, s) +#define LOG_PRIO(prio, s) \ + LOG_LOG_S(*NActors::TlsActivationContext, prio, NKikimrServices::KQP_COMPUTE, s) + + +#define SINK_LOG_T(s) LOG_T(LogPrefix << s) +#define SINK_LOG_D(s) LOG_D(LogPrefix << s) +#define SINK_LOG_I(s) LOG_I(LogPrefix << s) +#define SINK_LOG_W(s) LOG_W(LogPrefix << s) +#define SINK_LOG_N(s) LOG_N(LogPrefix << s) +#define SINK_LOG_E(s) LOG_E(LogPrefix << s) +#define SINK_LOG_C(s) LOG_C(LogPrefix << s) +#define SINK_LOG_PRIO(prio, s) LOG_PRIO(prio, LogPrefix << s) namespace NYql::NDq { @@ -117,6 +127,9 @@ public: const TMaybe<NDqProto::TCheckpoint>& checkpoint, bool finished) override { + SINK_LOG_T("SendData. Batch: " << batch.size() + << ". Checkpoint: " << checkpoint.Defined() + << ". Finished: " << finished); Y_UNUSED(dataSize); if (finished) { @@ -155,8 +168,8 @@ public: } if (checkpoint) { - if (Buffer.empty()) { - Callbacks->OnAsyncOutputStateSaved(BuildState(), OutputIndex, *checkpoint); + if (Buffer.empty() && WaitingAcks.empty()) { + Callbacks->OnAsyncOutputStateSaved(BuildState(*checkpoint), OutputIndex, *checkpoint); } else { DeferredCheckpoints.emplace(NextSeqNo + Buffer.size() - 1, *checkpoint); } @@ -180,6 +193,7 @@ public: if (data.GetVersion() == StateVersion) { // Current version NPq::NProto::TDqPqTopicSinkState stateProto; YQL_ENSURE(stateProto.ParseFromString(data.GetBlob()), "Serialized state is corrupted"); + SINK_LOG_D("Load state: " << stateProto); SourceId = stateProto.GetSourceId(); ConfirmedSeqNo = stateProto.GetConfirmedSeqNo(); NextSeqNo = ConfirmedSeqNo + 1; @@ -289,7 +303,7 @@ private: return !events.empty(); } - NDqProto::TSinkState BuildState() { + NDqProto::TSinkState BuildState(const NDqProto::TCheckpoint& checkpoint) { NPq::NProto::TDqPqTopicSinkState stateProto; stateProto.SetSourceId(GetSourceId()); stateProto.SetConfirmedSeqNo(ConfirmedSeqNo); @@ -300,10 +314,12 @@ private: auto* data = sinkState.MutableData()->MutableStateData(); data->SetVersion(StateVersion); data->SetBlob(serializedState); + SINK_LOG_T("Save checkpoint " << checkpoint << " state: " << stateProto << ". Sink state: " << sinkState); return sinkState; } void WriteNextMessage(NYdb::NPersQueue::TContinuationToken&& token) { + SINK_LOG_T("Write data: \"" << Buffer.front() << "\" with seq no " << NextSeqNo); WriteSession->Write(std::move(token), Buffer.front(), NextSeqNo++); WaitingAcks.push(GetItemSize(Buffer.front())); Buffer.pop(); @@ -331,7 +347,7 @@ private: for (auto it = ev.Acks.begin(); it != ev.Acks.end(); ++it) { //Y_VERIFY(it == ev.Acks.begin() || it->SeqNo == std::prev(it)->SeqNo + 1); - + LOG_T(Self.LogPrefix << "Ack seq no " << it->SeqNo); if (it->State == NYdb::NPersQueue::TWriteSessionEvent::TWriteAck::EEventState::EES_DISCARDED) { TIssues issues; issues.AddIssue(TStringBuilder() << "Message with seqNo " << it->SeqNo << " was discarded"); @@ -343,7 +359,8 @@ private: if (!Self.DeferredCheckpoints.empty() && std::get<0>(Self.DeferredCheckpoints.front()) == it->SeqNo) { Self.ConfirmedSeqNo = it->SeqNo; - Self.Callbacks->OnAsyncOutputStateSaved(Self.BuildState(), Self.OutputIndex, std::get<1>(Self.DeferredCheckpoints.front())); + const auto& checkpoint = std::get<1>(Self.DeferredCheckpoints.front()); + Self.Callbacks->OnAsyncOutputStateSaved(Self.BuildState(checkpoint), Self.OutputIndex, checkpoint); Self.DeferredCheckpoints.pop(); } } |