blob: 27770a58ab8e0d4b67662dfd826ec21b4501593d (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
#include "accessor_refresh.h"
namespace NKikimr::NMetadata::NProvider {
void TDSAccessorRefresher::OnBootstrap() {
TBase::OnBootstrap();
UnsafeBecome(&TDSAccessorRefresher::StateMain);
Sender<TEvRefresh>().SendTo(SelfId());
}
void TDSAccessorRefresher::OnNewEnrichedSnapshot(NFetcher::ISnapshot::TPtr snapshot) {
Schedule(Config.GetRefreshPeriod(), new TEvRefresh());
CurrentSnapshot = snapshot;
*CurrentSelection.mutable_result_sets() = std::move(*ProposedProto.mutable_result_sets());
OnSnapshotModified();
OnSnapshotRefresh();
}
void TDSAccessorRefresher::OnNewParsedSnapshot(Ydb::Table::ExecuteQueryResult&& qResult, NFetcher::ISnapshot::TPtr snapshot) {
*ProposedProto.mutable_result_sets() = std::move(*qResult.mutable_result_sets());
if (CurrentSelection.SerializeAsString() != ProposedProto.SerializeAsString()) {
ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "New refresher data: " << ProposedProto.DebugString();
SnapshotConstructor->EnrichSnapshotData(snapshot, InternalController);
} else {
CurrentSnapshot->SetActuality(GetRequestedActuality());
OnSnapshotRefresh();
Schedule(Config.GetRefreshPeriod(), new TEvRefresh());
}
}
void TDSAccessorRefresher::OnConstructSnapshotError(const TString& errorMessage) {
TBase::OnConstructSnapshotError(errorMessage);
Schedule(Config.GetRefreshPeriod(), new TEvRefresh());
}
void TDSAccessorRefresher::Handle(TEvRefresh::TPtr& /*ev*/) {
TBase::StartSnapshotsFetching();
}
}
|