from __future__ import absolute_import
import re
import sys
import copy
import logging
from . import tools
from datetime import date, datetime
import enum
import six
logger = logging.getLogger(__name__)
MDS_URI_PREFIX = 'https://storage.yandex-team.ru/get-devtools/'
def apply(func, value, apply_to_keys=False):
"""
Applies func to every possible member of value
:param value: could be either a primitive object or a complex one (list, dicts)
:param func: func to be applied
:return:
"""
def _apply(func, value, value_path):
if value_path is None:
value_path = []
if isinstance(value, list) or isinstance(value, tuple):
res = []
for ind, item in enumerate(value):
path = copy.copy(value_path)
path.append(ind)
res.append(_apply(func, item, path))
elif isinstance(value, dict):
if is_external(value):
# this is a special serialized object pointing to some external place
res = func(value, value_path)
else:
res = {}
for key, val in sorted(value.items(), key=lambda dict_item: dict_item[0]):
path = copy.copy(value_path)
path.append(key)
res[_apply(func, key, path) if apply_to_keys else key] = _apply(func, val, path)
else:
res = func(value, value_path)
return res
return _apply(func, value, None)
def is_coroutine(val):
if sys.version_info[0] < 3:
return False
else:
import asyncio
return asyncio.iscoroutinefunction(val) or asyncio.iscoroutine(val)
def serialize(value):
"""
Serialize value to json-convertible object
Ensures that all components of value can be serialized to json
:param value: object to be serialized
"""
def _serialize(val, _):
if val is None:
return val
if isinstance(val, six.string_types) or isinstance(val, bytes):
return tools.to_utf8(val)
if isinstance(val, enum.Enum):
return str(val)
if isinstance(val, six.integer_types) or type(val) in [float, bool]:
return val
if is_external(val):
return dict(val)
if isinstance(val, (date, datetime)):
return repr(val)
if is_coroutine(val):
return None
raise ValueError("Cannot serialize value '{}' of type {}".format(val, type(val)))
return apply(_serialize, value, apply_to_keys=True)
def is_external(value):
return isinstance(value, dict) and "uri" in value.keys()
class ExternalSchema(object):
File = "file"
SandboxResource = "sbr"
Delayed = "delayed"
HTTP = "http"
class CanonicalObject(dict):
def __iter__(self):
raise TypeError("Iterating canonical object is not implemented")
class ExternalDataInfo(object):
def __init__(self, data):
assert is_external(data)
self._data = data
def __str__(self):
type_str = "File" if self.is_file else "Sandbox resource"
return "{}({})".format(type_str, self.path)
def __repr__(self):
return str(self)
@property
def uri(self):
return self._data["uri"]
@property
def checksum(self):
return self._data.get("checksum")
@property
def is_file(self):
return self.uri.startswith(ExternalSchema.File)
@property
def is_sandbox_resource(self):
return self.uri.startswith(ExternalSchema.SandboxResource)
@property
def is_delayed(self):
return self.uri.startswith(ExternalSchema.Delayed)
@property
def is_http(self):
return self.uri.startswith(ExternalSchema.HTTP)
@property
def path(self):
if self.uri.count("://") != 1:
logger.error("Invalid external data uri: '%s'", self.uri)
return self.uri
_, path = self.uri.split("://")
return path
def get_mds_key(self):
assert self.is_http
m = re.match(re.escape(MDS_URI_PREFIX) + r'(.*?)($|#)', self.uri)
if m:
return m.group(1)
raise AssertionError("Failed to extract mds key properly from '{}'".format(self.uri))
@property
def size(self):
return self._data.get("size")
def serialize(self):
return self._data
@classmethod
def _serialize(cls, schema, path, checksum=None, attrs=None):
res = CanonicalObject({"uri": "{}://{}".format(schema, path)})
if checksum:
res["checksum"] = checksum
if attrs:
res.update(attrs)
return res
@classmethod
def serialize_file(cls, path, checksum=None, diff_tool=None, local=False, diff_file_name=None, diff_tool_timeout=None, size=None):
attrs = {}
if diff_tool:
attrs["diff_tool"] = diff_tool
if local:
attrs["local"] = local
if diff_file_name:
attrs["diff_file_name"] = diff_file_name
if diff_tool_timeout:
attrs["diff_tool_timeout"] = diff_tool_timeout
if size is not None:
attrs["size"] = size
return cls._serialize(ExternalSchema.File, path, checksum, attrs=attrs)
@classmethod
def serialize_resource(cls, id, checksum=None):
return cls._serialize(ExternalSchema.SandboxResource, id, checksum)
@classmethod
def serialize_delayed(cls, upload_id, checksum):
return cls._serialize(ExternalSchema.Delayed, upload_id, checksum)
def get(self, key, default=None):
return self._data.get(key, default)