/** * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0. */ #include <aws/s3/model/SelectObjectContentHandler.h> #include <aws/s3/S3ErrorMarshaller.h> #include <aws/core/client/CoreErrors.h> #include <aws/core/utils/event/EventStreamErrors.h> #include <aws/core/utils/logging/LogMacros.h> #include <aws/core/utils/xml/XmlSerializer.h> using namespace Aws::S3::Model; using namespace Aws::Utils::Event; using namespace Aws::Utils::Xml; namespace Aws { namespace S3 { namespace Model { using namespace Aws::Client; static const char SELECTOBJECTCONTENT_HANDLER_CLASS_TAG[] = "SelectObjectContentHandler"; SelectObjectContentHandler::SelectObjectContentHandler() : EventStreamHandler() { m_onRecordsEvent = [&](const RecordsEvent&) { AWS_LOGSTREAM_TRACE(SELECTOBJECTCONTENT_HANDLER_CLASS_TAG, "RecordsEvent received."); }; m_onStatsEvent = [&](const StatsEvent&) { AWS_LOGSTREAM_TRACE(SELECTOBJECTCONTENT_HANDLER_CLASS_TAG, "StatsEvent received."); }; m_onProgressEvent = [&](const ProgressEvent&) { AWS_LOGSTREAM_TRACE(SELECTOBJECTCONTENT_HANDLER_CLASS_TAG, "ProgressEvent received."); }; m_onContinuationEvent = [&]() { AWS_LOGSTREAM_TRACE(SELECTOBJECTCONTENT_HANDLER_CLASS_TAG, "ContinuationEvent received."); }; m_onEndEvent = [&]() { AWS_LOGSTREAM_TRACE(SELECTOBJECTCONTENT_HANDLER_CLASS_TAG, "EndEvent received."); }; m_onError = [&](const AWSError<S3Errors>& error) { AWS_LOGSTREAM_TRACE(SELECTOBJECTCONTENT_HANDLER_CLASS_TAG, "S3 Errors received, " << error); }; } void SelectObjectContentHandler::OnEvent() { // Handler internal error during event stream decoding. if (!*this) { AWSError<CoreErrors> error = EventStreamErrorsMapper::GetAwsErrorForEventStreamError(GetInternalError()); error.SetMessage(GetEventPayloadAsString()); m_onError(AWSError<S3Errors>(error)); return; } const auto& headers = GetEventHeaders(); auto messageTypeHeaderIter = headers.find(MESSAGE_TYPE_HEADER); if (messageTypeHeaderIter == headers.end()) { AWS_LOGSTREAM_WARN(SELECTOBJECTCONTENT_HANDLER_CLASS_TAG, "Header: " << MESSAGE_TYPE_HEADER << " not found in the message."); return; } switch (Aws::Utils::Event::Message::GetMessageTypeForName(messageTypeHeaderIter->second.GetEventHeaderValueAsString())) { case Aws::Utils::Event::Message::MessageType::EVENT: HandleEventInMessage(); break; case Aws::Utils::Event::Message::MessageType::REQUEST_LEVEL_ERROR: case Aws::Utils::Event::Message::MessageType::REQUEST_LEVEL_EXCEPTION: { HandleErrorInMessage(); break; } default: AWS_LOGSTREAM_WARN(SELECTOBJECTCONTENT_HANDLER_CLASS_TAG, "Unexpected message type: " << messageTypeHeaderIter->second.GetEventHeaderValueAsString()); break; } } void SelectObjectContentHandler::HandleEventInMessage() { const auto& headers = GetEventHeaders(); auto eventTypeHeaderIter = headers.find(EVENT_TYPE_HEADER); if (eventTypeHeaderIter == headers.end()) { AWS_LOGSTREAM_WARN(SELECTOBJECTCONTENT_HANDLER_CLASS_TAG, "Header: " << EVENT_TYPE_HEADER << " not found in the message."); return; } switch (SelectObjectContentEventMapper::GetSelectObjectContentEventTypeForName(eventTypeHeaderIter->second.GetEventHeaderValueAsString())) { case SelectObjectContentEventType::RECORDS: { RecordsEvent event(GetEventPayloadWithOwnership()); m_onRecordsEvent(event); break; } case SelectObjectContentEventType::STATS: { auto xmlDoc = XmlDocument::CreateFromXmlString(GetEventPayloadAsString()); if (!xmlDoc.WasParseSuccessful()) { AWS_LOGSTREAM_WARN(SELECTOBJECTCONTENT_HANDLER_CLASS_TAG, "Unable to generate a proper StatsEvent object from the response in XML format."); break; } m_onStatsEvent(StatsEvent(xmlDoc.GetRootElement())); break; } case SelectObjectContentEventType::PROGRESS: { auto xmlDoc = XmlDocument::CreateFromXmlString(GetEventPayloadAsString()); if (!xmlDoc.WasParseSuccessful()) { AWS_LOGSTREAM_WARN(SELECTOBJECTCONTENT_HANDLER_CLASS_TAG, "Unable to generate a proper ProgressEvent object from the response in XML format."); break; } m_onProgressEvent(ProgressEvent(xmlDoc.GetRootElement())); break; } case SelectObjectContentEventType::CONT: { m_onContinuationEvent(); break; } case SelectObjectContentEventType::END: { m_onEndEvent(); break; } default: AWS_LOGSTREAM_WARN(SELECTOBJECTCONTENT_HANDLER_CLASS_TAG, "Unexpected event type: " << eventTypeHeaderIter->second.GetEventHeaderValueAsString()); break; } } void SelectObjectContentHandler::HandleErrorInMessage() { const auto& headers = GetEventHeaders(); Aws::String errorCode; Aws::String errorMessage; auto errorHeaderIter = headers.find(ERROR_CODE_HEADER); if (errorHeaderIter == headers.end()) { errorHeaderIter = headers.find(EXCEPTION_TYPE_HEADER); if (errorHeaderIter == headers.end()) { AWS_LOGSTREAM_WARN(SELECTOBJECTCONTENT_HANDLER_CLASS_TAG, "Error type was not found in the event message."); return; } } errorCode = errorHeaderIter->second.GetEventHeaderValueAsString(); errorHeaderIter = headers.find(ERROR_MESSAGE_HEADER); if (errorHeaderIter == headers.end()) { errorHeaderIter = headers.find(EXCEPTION_TYPE_HEADER); if (errorHeaderIter == headers.end()) { AWS_LOGSTREAM_WARN(SELECTOBJECTCONTENT_HANDLER_CLASS_TAG, "Error description was not found in the event message."); return; } } errorMessage = errorHeaderIter->second.GetEventHeaderValueAsString(); MarshallError(errorCode, errorMessage); } void SelectObjectContentHandler::MarshallError(const Aws::String& errorCode, const Aws::String& errorMessage) { S3ErrorMarshaller errorMarshaller; AWSError<CoreErrors> error; if (errorCode.empty()) { error = AWSError<CoreErrors>(CoreErrors::UNKNOWN, "", errorMessage, false); } else { error = errorMarshaller.FindErrorByName(errorMessage.c_str()); if (error.GetErrorType() != CoreErrors::UNKNOWN) { AWS_LOGSTREAM_WARN(SELECTOBJECTCONTENT_HANDLER_CLASS_TAG, "Encountered AWSError '" << errorCode.c_str() << "': " << errorMessage.c_str()); error.SetExceptionName(errorCode); error.SetMessage(errorMessage); } else { AWS_LOGSTREAM_WARN(SELECTOBJECTCONTENT_HANDLER_CLASS_TAG, "Encountered Unknown AWSError '" << errorCode.c_str() << "': " << errorMessage.c_str()); error = AWSError<CoreErrors>(CoreErrors::UNKNOWN, errorCode, "Unable to parse ExceptionName: " + errorCode + " Message: " + errorMessage, false); } } m_onError(AWSError<S3Errors>(error)); } namespace SelectObjectContentEventMapper { static const int RECORDS_HASH = Aws::Utils::HashingUtils::HashString("Records"); static const int STATS_HASH = Aws::Utils::HashingUtils::HashString("Stats"); static const int PROGRESS_HASH = Aws::Utils::HashingUtils::HashString("Progress"); static const int CONT_HASH = Aws::Utils::HashingUtils::HashString("Cont"); static const int END_HASH = Aws::Utils::HashingUtils::HashString("End"); SelectObjectContentEventType GetSelectObjectContentEventTypeForName(const Aws::String& name) { int hashCode = Aws::Utils::HashingUtils::HashString(name.c_str()); if (hashCode == RECORDS_HASH) { return SelectObjectContentEventType::RECORDS; } else if (hashCode == STATS_HASH) { return SelectObjectContentEventType::STATS; } else if (hashCode == PROGRESS_HASH) { return SelectObjectContentEventType::PROGRESS; } else if (hashCode == CONT_HASH) { return SelectObjectContentEventType::CONT; } else if (hashCode == END_HASH) { return SelectObjectContentEventType::END; } return SelectObjectContentEventType::UNKNOWN; } Aws::String GetNameForSelectObjectContentEventType(SelectObjectContentEventType value) { switch (value) { case SelectObjectContentEventType::RECORDS: return "Records"; case SelectObjectContentEventType::STATS: return "Stats"; case SelectObjectContentEventType::PROGRESS: return "Progress"; case SelectObjectContentEventType::CONT: return "Cont"; case SelectObjectContentEventType::END: return "End"; default: return "Unknown"; } } } // namespace SelectObjectContentEventMapper } // namespace Model } // namespace S3 } // namespace Aws