aboutsummaryrefslogtreecommitdiffstats
path: root/ydb/services/metadata/ds_table/accessor_refresh.cpp
blob: 27770a58ab8e0d4b67662dfd826ec21b4501593d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#include "accessor_refresh.h"

namespace NKikimr::NMetadata::NProvider {

void TDSAccessorRefresher::OnBootstrap() {
    TBase::OnBootstrap();
    UnsafeBecome(&TDSAccessorRefresher::StateMain);
    Sender<TEvRefresh>().SendTo(SelfId());
}

void TDSAccessorRefresher::OnNewEnrichedSnapshot(NFetcher::ISnapshot::TPtr snapshot) {
    Schedule(Config.GetRefreshPeriod(), new TEvRefresh());
    CurrentSnapshot = snapshot;
    *CurrentSelection.mutable_result_sets() = std::move(*ProposedProto.mutable_result_sets());
    OnSnapshotModified();
    OnSnapshotRefresh();
}

void TDSAccessorRefresher::OnNewParsedSnapshot(Ydb::Table::ExecuteQueryResult&& qResult, NFetcher::ISnapshot::TPtr snapshot) {
    *ProposedProto.mutable_result_sets() = std::move(*qResult.mutable_result_sets());
    if (CurrentSelection.SerializeAsString() != ProposedProto.SerializeAsString()) {
        ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "New refresher data: " << ProposedProto.DebugString();
        SnapshotConstructor->EnrichSnapshotData(snapshot, InternalController);
    } else {
        CurrentSnapshot->SetActuality(GetRequestedActuality());
        OnSnapshotRefresh();
        Schedule(Config.GetRefreshPeriod(), new TEvRefresh());
    }
}

void TDSAccessorRefresher::OnConstructSnapshotError(const TString& errorMessage) {
    TBase::OnConstructSnapshotError(errorMessage);
    Schedule(Config.GetRefreshPeriod(), new TEvRefresh());
}

void TDSAccessorRefresher::Handle(TEvRefresh::TPtr& /*ev*/) {
    TBase::StartSnapshotsFetching();
}

}