import json
import os
import tarfile
import tempfile
from pathlib import Path
import pandas as pd
from featuretools.entityset.relationship import Relationship
from featuretools.entityset.serialize import FORMATS
from featuretools.utils.gen_utils import check_schema_version
from featuretools.utils.s3_utils import get_transport_params, use_smartopen_es
from featuretools.utils.wrangle import _is_s3, _is_url
from featuretools.variable_types.variable import LatLong, find_variable_types
def description_to_variable(description, entity=None):
'''Deserialize variable from variable description.
Args:
description (dict) : Description of :class:`.Variable`.
entity (Entity) : Instance of :class:`.Entity` to add :class:`.Variable`. If entity is None, :class:`.Variable` will not be instantiated.
Returns:
variable (Variable) : Returns :class:`.Variable`.
'''
variable_types = find_variable_types()
is_type_string = isinstance(description['type'], str)
type = description['type'] if is_type_string else description['type'].pop('value')
variable = variable_types.get(type, variable_types.get('None')) # 'None' will return the Unknown variable type
if entity is not None:
kwargs = {} if is_type_string else description['type']
variable = variable(description['id'], entity, **kwargs)
interesting_values = pd.read_json(description['properties']['interesting_values'], typ='series')
variable.interesting_values = interesting_values
return variable
def description_to_entity(description, entityset, path=None):
'''Deserialize entity from entity description and add to entityset.
Args:
description (dict) : Description of :class:`.Entity`.
entityset (EntitySet) : Instance of :class:`.EntitySet` to add :class:`.Entity`.
path (str) : Root directory to serialized entityset.
'''
if path:
dataframe = read_entity_data(description, path=path)
else:
dataframe = empty_dataframe(description)
variable_types = {variable['id']: (description_to_variable(variable), variable)
for variable in description['variables']}
es = entityset.entity_from_dataframe(
description['id'],
dataframe,
index=description.get('index'),
time_index=description.get('time_index'),
secondary_time_index=description['properties'].get('secondary_time_index'),
variable_types={variable: variable_types[variable][0] for variable in variable_types})
for variable in es[description['id']].variables:
interesting_values = variable_types[variable.id][1]['properties']['interesting_values']
interesting_values = pd.read_json(interesting_values, typ="series")
variable.interesting_values = interesting_values
def description_to_entityset(description, **kwargs):
'''Deserialize entityset from data description.
Args:
description (dict) : Description of an :class:`.EntitySet`. Likely generated using :meth:`.serialize.entityset_to_description`
kwargs (keywords): Additional keyword arguments to pass as keywords arguments to the underlying deserialization method.
Returns:
entityset (EntitySet) : Instance of :class:`.EntitySet`.
'''
check_schema_version(description, 'entityset')
from featuretools.entityset import EntitySet
# If data description was not read from disk, path is None.
path = description.get('path')
entityset = EntitySet(description['id'])
last_time_index = []
for entity in description['entities'].values():
entity['loading_info']['params'].update(kwargs)
# If path is None, an empty dataframe will be created for entity.
description_to_entity(entity, entityset, path=path)
if entity['properties']['last_time_index']:
last_time_index.append(entity['id'])
for relationship in description['relationships']:
relationship = Relationship.from_dictionary(relationship, entityset)
entityset.add_relationship(relationship)
if len(last_time_index):
entityset.add_last_time_indexes(updated_entities=last_time_index)
return entityset
def empty_dataframe(description):
'''Deserialize empty dataframe from entity description.
Args:
description (dict) : Description of :class:`.Entity`.
Returns:
df (DataFrame) : Empty dataframe for entity.
'''
columns = [variable['id'] for variable in description['variables']]
dtypes = description['loading_info']['properties']['dtypes']
return pd.DataFrame(columns=columns).astype(dtypes)
def read_entity_data(description, path):
'''Read description data from disk.
Args:
description (dict) : Description of :class:`.Entity`.
path (str): Location on disk to read entity data.
Returns:
df (DataFrame) : Instance of dataframe.
'''
file = os.path.join(path, description['loading_info']['location'])
kwargs = description['loading_info'].get('params', {})
load_format = description['loading_info']['type']
if load_format == 'csv':
dataframe = pd.read_csv(
file,
engine=kwargs['engine'],
compression=kwargs['compression'],
encoding=kwargs['encoding'],
)
elif load_format == 'parquet':
dataframe = pd.read_parquet(file, engine=kwargs['engine'])
elif load_format == 'pickle':
dataframe = pd.read_pickle(file, **kwargs)
else:
error = 'must be one of the following formats: {}'
raise ValueError(error.format(', '.join(FORMATS)))
dtypes = description['loading_info']['properties']['dtypes']
dataframe = dataframe.astype(dtypes)
if load_format in ['parquet', 'csv']:
latlongs = []
for var_description in description['variables']:
if var_description['type']['value'] == LatLong.type_string:
latlongs.append(var_description["id"])
def parse_latlong(x):
return tuple(float(y) for y in x[1:-1].split(","))
for column in latlongs:
dataframe[column] = dataframe[column].apply(parse_latlong)
return dataframe
def read_data_description(path):
'''Read data description from disk, S3 path, or URL.
Args:
path (str): Location on disk, S3 path, or URL to read `data_description.json`.
Returns:
description (dict) : Description of :class:`.EntitySet`.
'''
path = os.path.abspath(path)
assert os.path.exists(path), '"{}" does not exist'.format(path)
file = os.path.join(path, 'data_description.json')
with open(file, 'r') as file:
description = json.load(file)
description['path'] = path
return description
[docs]def read_entityset(path, profile_name=None, **kwargs):
'''Read entityset from disk, S3 path, or URL.
Args:
path (str): Directory on disk, S3 path, or URL to read `data_description.json`.
profile_name (str, bool): The AWS profile specified to write to S3. Will default to None and search for AWS credentials.
Set to False to use an anonymous profile.
kwargs (keywords): Additional keyword arguments to pass as keyword arguments to the underlying deserialization method.
'''
if _is_url(path) or _is_s3(path):
with tempfile.TemporaryDirectory() as tmpdir:
file_name = Path(path).name
file_path = os.path.join(tmpdir, file_name)
transport_params = None
if _is_s3(path):
transport_params = get_transport_params(profile_name)
use_smartopen_es(file_path, path, transport_params)
with tarfile.open(str(file_path)) as tar:
tar.extractall(path=tmpdir)
data_description = read_data_description(tmpdir)
return description_to_entityset(data_description, **kwargs)
else:
data_description = read_data_description(path)
return description_to_entityset(data_description, **kwargs)