aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorudovichenko-r <rvu@ydb.tech>2023-04-06 11:03:46 +0300
committerudovichenko-r <rvu@ydb.tech>2023-04-06 11:03:46 +0300
commit6c00a004990fdcd29f8a065d0f08c6a17d7999b2 (patch)
tree66c66e49e727da25c327db33e157ca37c67d1351
parentc3ca42136c25f3175755452eec3b788524f937c2 (diff)
downloadydb-6c00a004990fdcd29f8a065d0f08c6a17d7999b2.tar.gz
[yql] Restore user data storage on fallback
YQL-15761
-rw-r--r--ydb/core/fq/libs/actors/run_actor.cpp6
-rw-r--r--ydb/library/yql/core/facade/yql_facade.cpp51
-rw-r--r--ydb/library/yql/core/facade/yql_facade.h3
-rw-r--r--ydb/library/yql/core/yql_type_annotation.h8
-rw-r--r--ydb/library/yql/core/yql_user_data_storage.cpp18
-rw-r--r--ydb/library/yql/core/yql_user_data_storage.h7
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_settings.cpp2
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_settings.h2
-rw-r--r--ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp16
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp10
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp2
11 files changed, 69 insertions, 56 deletions
diff --git a/ydb/core/fq/libs/actors/run_actor.cpp b/ydb/core/fq/libs/actors/run_actor.cpp
index b535cd2d2a3..61d46e86615 100644
--- a/ydb/core/fq/libs/actors/run_actor.cpp
+++ b/ydb/core/fq/libs/actors/run_actor.cpp
@@ -731,7 +731,7 @@ private:
for (auto& output : *task.MutableOutputs()) {
if (output.HasSink() && output.GetSink().GetType() == "S3Sink") {
NS3::TSink s3SinkSettings;
- YQL_ENSURE(output.GetSink().GetSettings().UnpackTo(&s3SinkSettings));
+ YQL_ENSURE(output.GetSink().GetSettings().UnpackTo(&s3SinkSettings));
if (s3SinkSettings.GetAtomicUploadCommit()) {
auto prefix = s3SinkSettings.GetUrl() + s3SinkSettings.GetPath();
const auto& [it, isNew] = S3Prefixes.insert(prefix);
@@ -1368,7 +1368,7 @@ private:
TDqConfiguration::TPtr dqConfiguration = MakeIntrusive<TDqConfiguration>();
dqConfiguration->Dispatch(dqGraphParams.GetSettings());
dqConfiguration->FreezeDefaults();
- dqConfiguration->FallbackPolicy = "never";
+ dqConfiguration->FallbackPolicy = EFallbackPolicy::Never;
TEvaluationGraphInfo info;
@@ -1413,7 +1413,7 @@ private:
TDqConfiguration::TPtr dqConfiguration = MakeIntrusive<TDqConfiguration>();
dqConfiguration->Dispatch(dqGraphParams.GetSettings());
dqConfiguration->FreezeDefaults();
- dqConfiguration->FallbackPolicy = "never";
+ dqConfiguration->FallbackPolicy = EFallbackPolicy::Never;
ExecuterId = Register(NYql::NDq::MakeDqExecuter(MakeNodesManagerId(), SelfId(), Params.QueryId, "", dqConfiguration, QueryCounters.Counters, TInstant::Now(), EnableCheckpointCoordinator));
diff --git a/ydb/library/yql/core/facade/yql_facade.cpp b/ydb/library/yql/core/facade/yql_facade.cpp
index 078d67c6525..6c5b2d18b1d 100644
--- a/ydb/library/yql/core/facade/yql_facade.cpp
+++ b/ydb/library/yql/core/facade/yql_facade.cpp
@@ -255,6 +255,7 @@ TProgram::TProgram(
, UdfIndex_(udfIndex)
, UdfIndexPackageSet_(udfIndexPackageSet)
, FileStorage_(fileStorage)
+ , SavedUserDataTable_(userDataTable)
, UserDataStorage_(MakeIntrusive<TUserDataStorage>(fileStorage, userDataTable, udfResolver, udfIndex))
, GatewaysConfig_(gatewaysConfig)
, Filename_(filename)
@@ -934,11 +935,8 @@ TFuture<IGraphTransformer::TStatus> TProgram::AsyncTransformWithFallback(bool ap
&& !TypeCtx_->ForceDq
&& SavedExprRoot_
&& TypeCtx_->DqCaptured
- && TypeCtx_->DqFallbackPolicy != "never")
+ && TypeCtx_->DqFallbackPolicy != EFallbackPolicy::Never)
{
- ExprRoot_ = SavedExprRoot_;
- SavedExprRoot_ = nullptr;
-
auto issues = ExprCtx_->IssueManager.GetIssues();
bool hasDqGatewayError = false;
bool hasDqGatewayFallbackError = false;
@@ -965,18 +963,6 @@ TFuture<IGraphTransformer::TStatus> TProgram::AsyncTransformWithFallback(bool ap
}
};
- std::function<void(const TIssuePtr& issue)> toInfo = [&](const TIssuePtr& issue) {
- if (issue->Severity == TSeverityIds::S_ERROR
- || issue->Severity == TSeverityIds::S_FATAL
- || issue->Severity == TSeverityIds::S_WARNING)
- {
- issue->Severity = TSeverityIds::S_INFO;
- }
- for (const auto& subissue : issue->GetSubIssues()) {
- toInfo(subissue);
- }
- };
-
for (const auto& issue : issues) {
checkIssue(issue);
// check subissues
@@ -985,14 +971,16 @@ TFuture<IGraphTransformer::TStatus> TProgram::AsyncTransformWithFallback(bool ap
}
}
- ExprCtx_->IssueManager.Reset();
-
- if (hasDqGatewayError && !hasDqGatewayFallbackError && TypeCtx_->DqFallbackPolicy.find("always") == TString::npos) {
+ if (hasDqGatewayError && !hasDqGatewayFallbackError && TypeCtx_->DqFallbackPolicy != EFallbackPolicy::Always) {
// unrecoverable error
- ExprCtx_->IssueManager.AddIssues(issues);
return res;
}
+ ExprRoot_ = SavedExprRoot_;
+ SavedExprRoot_ = nullptr;
+ UserDataStorage_->SetUserDataTable(std::move(SavedUserDataTable_));
+
+ ExprCtx_->IssueManager.Reset();
YQL_LOG(DEBUG) << "Fallback, Issues: " << issues.ToString();
ExprCtx_->Reset();
@@ -1007,6 +995,19 @@ TFuture<IGraphTransformer::TStatus> TProgram::AsyncTransformWithFallback(bool ap
CleanupLastSession();
if (hasDqGatewayError || hasDqOptimizeError) {
+
+ std::function<void(const TIssuePtr& issue)> toInfo = [&](const TIssuePtr& issue) {
+ if (issue->Severity == TSeverityIds::S_ERROR
+ || issue->Severity == TSeverityIds::S_FATAL
+ || issue->Severity == TSeverityIds::S_WARNING)
+ {
+ issue->Severity = TSeverityIds::S_INFO;
+ }
+ for (const auto& subissue : issue->GetSubIssues()) {
+ toInfo(subissue);
+ }
+ };
+
TIssue info("DQ cannot execute the query");
info.Severity = TSeverityIds::S_INFO;
@@ -1027,14 +1028,14 @@ TFuture<IGraphTransformer::TStatus> TProgram::AsyncTransformWithFallback(bool ap
ExprCtx_->IssueManager.AddIssues({info});
}
- FallbackCounter ++;
+ ++FallbackCounter_;
// don't execute recapture again
ExprCtx_->Step.Done(TExprStep::Recapture);
AbortHidden_();
return AsyncTransformWithFallback(false);
}
- if (status == IGraphTransformer::TStatus::Error && (TypeCtx_->DqFallbackPolicy == "never" || TypeCtx_->ForceDq)) {
- YQL_LOG(DEBUG) << "Fallback skipped due to per query policy";
+ if (status == IGraphTransformer::TStatus::Error && (TypeCtx_->DqFallbackPolicy == EFallbackPolicy::Never || TypeCtx_->ForceDq)) {
+ YQL_LOG(INFO) << "Fallback skipped due to per query policy";
}
return res;
});
@@ -1202,11 +1203,11 @@ TMaybe<TString> TProgram::GetStatistics(bool totalOnly, THashMap<TString, TStrin
writer.OnInt64Scalar(rusage.MajorPageFaults);
writer.OnEndMap();
- if (FallbackCounter) {
+ if (FallbackCounter_) {
writer.OnKeyedItem("Fallback");
writer.OnBeginMap();
writer.OnKeyedItem("count");
- writer.OnInt64Scalar(FallbackCounter);
+ writer.OnInt64Scalar(FallbackCounter_);
writer.OnEndMap();
}
diff --git a/ydb/library/yql/core/facade/yql_facade.h b/ydb/library/yql/core/facade/yql_facade.h
index aead549e20c..cbbb758efd7 100644
--- a/ydb/library/yql/core/facade/yql_facade.h
+++ b/ydb/library/yql/core/facade/yql_facade.h
@@ -360,6 +360,7 @@ private:
const TUdfIndex::TPtr UdfIndex_;
const TUdfIndexPackageSet::TPtr UdfIndexPackageSet_;
const TFileStoragePtr FileStorage_;
+ TUserDataTable SavedUserDataTable_;
const TUserDataStorage::TPtr UserDataStorage_;
const TGatewaysConfig* GatewaysConfig_;
TString Filename_;
@@ -398,7 +399,7 @@ private:
TString ExtractedQueryParametersMetadataYson_;
const bool EnableRangeComputeFor_;
const IArrowResolver::TPtr ArrowResolver_;
- i64 FallbackCounter = 0;
+ i64 FallbackCounter_ = 0;
const EHiddenMode HiddenMode_ = EHiddenMode::Disable;
THiddenQueryAborter AbortHidden_ = [](){};
};
diff --git a/ydb/library/yql/core/yql_type_annotation.h b/ydb/library/yql/core/yql_type_annotation.h
index f1c633f424c..7e63c2780df 100644
--- a/ydb/library/yql/core/yql_type_annotation.h
+++ b/ydb/library/yql/core/yql_type_annotation.h
@@ -168,6 +168,12 @@ enum class EHiddenMode {
Auto /* "auto" */
};
+enum class EFallbackPolicy {
+ Default /* "default" */,
+ Never /* "never" */,
+ Always /* "always" */
+};
+
struct TUdfCachedInfo {
const TTypeAnnotationNode* FunctionType = nullptr;
const TTypeAnnotationNode* RunConfigType = nullptr;
@@ -218,7 +224,7 @@ struct TTypeAnnotationContext: public TThrRefBase {
bool DiscoveryMode = false;
bool ForceDq = false;
bool DqCaptured = false; // TODO: Add before/after recapture transformers
- TString DqFallbackPolicy = "";
+ EFallbackPolicy DqFallbackPolicy = EFallbackPolicy::Default;
bool StrictTableProps = true;
bool JsonQueryReturnsJsonDocument = false;
bool YsonCastToString = true;
diff --git a/ydb/library/yql/core/yql_user_data_storage.cpp b/ydb/library/yql/core/yql_user_data_storage.cpp
index 1e98287876e..bbca125e5ab 100644
--- a/ydb/library/yql/core/yql_user_data_storage.cpp
+++ b/ydb/library/yql/core/yql_user_data_storage.cpp
@@ -40,8 +40,8 @@ namespace NYql {
TUserDataStorage::TUserDataStorage(TFileStoragePtr fileStorage, TUserDataTable data, IUdfResolver::TPtr udfResolver, TUdfIndex::TPtr udfIndex)
: FileStorage_(std::move(fileStorage))
, UserData_(std::move(data))
- , UdfResolver(std::move(udfResolver))
- , UdfIndex(std::move(udfIndex))
+ , UdfResolver_(std::move(udfResolver))
+ , UdfIndex_(std::move(udfIndex))
{
}
@@ -53,6 +53,10 @@ void TUserDataStorage::SetUrlPreprocessor(IUrlPreprocessing::TPtr urlPreprocessi
UrlPreprocessing_ = std::move(urlPreprocessing);
}
+void TUserDataStorage::SetUserDataTable(TUserDataTable data) {
+ UserData_ = std::move(data);
+}
+
void TUserDataStorage::AddUserDataBlock(const TStringBuf& name, const TUserDataBlock& block) {
const auto key = ComposeUserDataKey(name);
AddUserDataBlock(key, block);
@@ -226,21 +230,21 @@ TUserDataBlock* TUserDataStorage::FreezeUdfNoThrow(const TUserDataKey& key,
}
block->CustomUdfPrefix = customUdfPrefix;
- if (!ScannedUdfs.insert(key).second) {
+ if (!ScannedUdfs_.insert(key).second) {
// already scanned
return block;
}
- if (!UdfIndex) {
+ if (!UdfIndex_) {
return block;
}
try {
TString scope = "ScanUdfStrategy " + key.Alias();
YQL_PROFILE_SCOPE(DEBUG, scope.c_str());
- Y_ENSURE(UdfResolver);
- Y_ENSURE(UdfIndex);
- LoadRichMetadataToUdfIndex(*UdfResolver, *block, TUdfIndex::EOverrideMode::ReplaceWithNew, *UdfIndex);
+ Y_ENSURE(UdfResolver_);
+ Y_ENSURE(UdfIndex_);
+ LoadRichMetadataToUdfIndex(*UdfResolver_, *block, TUdfIndex::EOverrideMode::ReplaceWithNew, *UdfIndex_);
} catch (const std::exception& e) {
errorMessage = TStringBuilder() << "Failed to scan udf with key " << key << ", details: " << e.what();
return nullptr;
diff --git a/ydb/library/yql/core/yql_user_data_storage.h b/ydb/library/yql/core/yql_user_data_storage.h
index aed668e4c15..ef503e071c7 100644
--- a/ydb/library/yql/core/yql_user_data_storage.h
+++ b/ydb/library/yql/core/yql_user_data_storage.h
@@ -19,6 +19,7 @@ public:
TUserDataStorage(TFileStoragePtr fileStorage, TUserDataTable data, IUdfResolver::TPtr udfResolver, TUdfIndex::TPtr udfIndex);
void SetTokenResolver(TTokenResolver tokenResolver);
void SetUrlPreprocessor(IUrlPreprocessing::TPtr urlPreprocessing);
+ void SetUserDataTable(TUserDataTable data);
void AddUserDataBlock(const TStringBuf& name, const TUserDataBlock& block);
void AddUserDataBlock(const TUserDataKey& key, const TUserDataBlock& block);
@@ -71,12 +72,12 @@ private:
private:
THoldingFileStorage FileStorage_;
TUserDataTable UserData_;
- IUdfResolver::TPtr UdfResolver;
- TUdfIndex::TPtr UdfIndex;
+ IUdfResolver::TPtr UdfResolver_;
+ TUdfIndex::TPtr UdfIndex_;
TTokenResolver TokenResolver_;
IUrlPreprocessing::TPtr UrlPreprocessing_;
- THashSet<TUserDataKey, TUserDataKey::THash, TUserDataKey::TEqualTo> ScannedUdfs;
+ THashSet<TUserDataKey, TUserDataKey::THash, TUserDataKey::TEqualTo> ScannedUdfs_;
std::function<void(const TUserDataBlock& block)> ScanUdfStrategy_;
};
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
index 3832b4d9e5b..c89b57ce027 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
+++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
@@ -17,7 +17,7 @@ TDqConfiguration::TDqConfiguration() {
REGISTER_SETTING(*this, MaxNetworkRetries);
REGISTER_SETTING(*this, RetryBackoffMs);
REGISTER_SETTING(*this, CollectCoreDumps);
- REGISTER_SETTING(*this, FallbackPolicy);
+ REGISTER_SETTING(*this, FallbackPolicy).Parser([](const TString& v) { return FromString<EFallbackPolicy>(v); });
REGISTER_SETTING(*this, PullRequestTimeoutMs);
REGISTER_SETTING(*this, PingTimeoutMs);
REGISTER_SETTING(*this, UseSimpleYtReader);
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
index ca1cef9987b..8861b654572 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h
+++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
@@ -52,7 +52,7 @@ struct TDqSettings {
NCommon::TConfSetting<int, false> MaxNetworkRetries;
NCommon::TConfSetting<ui64, false> RetryBackoffMs;
NCommon::TConfSetting<bool, false> CollectCoreDumps;
- NCommon::TConfSetting<TString, false> FallbackPolicy;
+ NCommon::TConfSetting<EFallbackPolicy, false> FallbackPolicy;
NCommon::TConfSetting<ui64, false> PullRequestTimeoutMs;
NCommon::TConfSetting<ui64, false> PingTimeoutMs;
NCommon::TConfSetting<bool, false> UseSimpleYtReader;
diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
index eec224331b5..1cf14115d03 100644
--- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
+++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
@@ -767,7 +767,7 @@ private:
enableLocalRun);
if (lambdaResult.first.Level == TStatus::Error) {
- if (State->Settings->FallbackPolicy.Get().GetOrElse("default") == "never"
+ if (State->Settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default) == EFallbackPolicy::Never
|| State->TypeCtx->ForceDq)
{
return SyncError();
@@ -844,7 +844,7 @@ private:
state->Statistics[state->MetricId++] = res.Statistics;
if (res.Fallback) {
- if (state->Settings->FallbackPolicy.Get().GetOrElse("default") == "never" || state->TypeCtx->ForceDq) {
+ if (state->Settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default) == EFallbackPolicy::Never || state->TypeCtx->ForceDq) {
auto issues = TIssues{TIssue(ctx.GetPosition(input->Pos()), "Gateway Error").SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_WARNING)};
issues.AddIssues(res.Issues());
ctx.AssociativeIssues.emplace(input.Get(), std::move(issues));
@@ -1061,10 +1061,10 @@ private:
auto stagesCount = executionPlanner->StagesCount();
if (!executionPlanner->CanFallback()) {
- settings->FallbackPolicy = State->TypeCtx->DqFallbackPolicy = "never";
+ settings->FallbackPolicy = State->TypeCtx->DqFallbackPolicy = EFallbackPolicy::Never;
}
- bool canFallback = (settings->FallbackPolicy.Get().GetOrElse("default") != "never" && !State->TypeCtx->ForceDq);
+ bool canFallback = (settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default) != EFallbackPolicy::Never && !State->TypeCtx->ForceDq);
if (stagesCount > maxTasksPerOperation && canFallback) {
return SyncStatus(FallbackWithMessage(
@@ -1242,7 +1242,7 @@ private:
if (truncated && !state->TypeCtx->ForceDq && !enableFullResultWrite) {
auto issue = TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "DQ cannot execute the query. Cause: " << "too big result " << trStr).SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_INFO);
- bool error = settings->FallbackPolicy.Get().GetOrElse("default") == "never";
+ bool error = settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default) == EFallbackPolicy::Never;
for (const auto& i : res.Issues()) {
TIssuePtr subIssue = new TIssue(i);
if (error && subIssue->Severity == TSeverityIds::S_WARNING) {
@@ -1549,10 +1549,10 @@ private:
auto stagesCount = executionPlanner->StagesCount();
if (!executionPlanner->CanFallback()) {
- settings->FallbackPolicy = State->TypeCtx->DqFallbackPolicy = "never";
+ settings->FallbackPolicy = State->TypeCtx->DqFallbackPolicy = EFallbackPolicy::Never;
}
- bool canFallback = (settings->FallbackPolicy.Get().GetOrElse("default") != "never" && !State->TypeCtx->ForceDq);
+ bool canFallback = (settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default) != EFallbackPolicy::Never && !State->TypeCtx->ForceDq);
if (stagesCount > maxTasksPerOperation && canFallback) {
return FallbackWithMessage(
@@ -1642,7 +1642,7 @@ private:
executionPlanner.Destroy();
- bool neverFallback = settings->FallbackPolicy.Get().GetOrElse("default") == "never";
+ bool neverFallback = settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default) == EFallbackPolicy::Never;
future.Subscribe([publicIds, state = State, startTime, execState = ExecState, node = input.Get(), neverFallback, logCtx](const NThreading::TFuture<IDqGateway::TResult>& completedFuture) {
YQL_LOG_CTX_ROOT_SESSION_SCOPE(logCtx);
YQL_ENSURE(!completedFuture.HasException());
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp
index bd39e74fbbd..dfcaf9ef8a8 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp
@@ -81,12 +81,12 @@ public:
NYql::TIssues issues;
auto operation = resp.operation();
-
+
for (auto& message_ : *operation.Mutableissues()) {
TDeque<std::remove_reference_t<decltype(message_)>*> queue;
queue.push_front(&message_);
while (!queue.empty()) {
- auto& message = *queue.front();
+ auto& message = *queue.front();
queue.pop_front();
message.Setmessage(NBacktrace::Symbolize(message.Getmessage(), modulesMapping));
for (auto &subMsg : *message.Mutableissues()) {
@@ -175,10 +175,10 @@ public:
) {
auto backoff = TDuration::MilliSeconds(settings->RetryBackoffMs.Get().GetOrElse(1000));
auto promise = NewPromise<TResult>();
- auto fallbackPolicy = settings->FallbackPolicy.Get().GetOrElse("default");
- auto alwaysFallback = fallbackPolicy == "always";
+ const auto fallbackPolicy = settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default);
+ const auto alwaysFallback = EFallbackPolicy::Always == fallbackPolicy;
auto self = weak_from_this();
- auto callback = [self, promise, sessionId, alwaysFallback, modulesMapping](NGrpc::TGrpcStatus&& status, TResponse&& resp) mutable {
+ auto callback = [self, promise, sessionId, alwaysFallback, modulesMapping](NGrpc::TGrpcStatus&& status, TResponse&& resp) mutable {
auto this_ = self.lock();
if (!this_) {
YQL_CLOG(DEBUG, ProviderDq) << "Gateway was closed: " << sessionId;
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp
index a63d4fbb167..e60af247c41 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp
@@ -104,7 +104,7 @@ public:
}
}
- State_->TypeCtx->DqFallbackPolicy = State_->Settings->FallbackPolicy.Get().GetOrElse("default");
+ State_->TypeCtx->DqFallbackPolicy = State_->Settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default);
IGraphTransformer::TStatus status = NDq::DqWrapRead(input, output, ctx, *State_->TypeCtx, *State_->Settings);
if (input != output) {