NOTICE
The upcoming release of Featuretools 1.0.0 contains several breaking changes. Users are encouraged to test this version prior to release by installing from GitHub:
pip install https://github.com/alteryx/featuretools/archive/woodwork-integration.zip
For details on migrating to the new version, refer to Transitioning to Featuretools Version 1.0. Please report any issues in the Featuretools GitHub repo or by messaging in Alteryx Open Source Slack.
import json import os import tarfile import tempfile import dask.dataframe as dd import numpy as np import pandas as pd from featuretools.entityset.relationship import Relationship from featuretools.entityset.serialize import FORMATS from featuretools.utils.gen_utils import check_schema_version, import_or_raise from featuretools.utils.s3_utils import get_transport_params, use_smartopen_es from featuretools.utils.wrangle import _is_local_tar, _is_s3, _is_url from featuretools.variable_types import LatLong, find_variable_types def description_to_variable(description, entity=None): '''Deserialize variable from variable description. Args: description (dict) : Description of :class:`.Variable`. entity (Entity) : Instance of :class:`.Entity` to add :class:`.Variable`. If entity is None, :class:`.Variable` will not be instantiated. Returns: variable (Variable) : Returns :class:`.Variable`. ''' is_type_string = isinstance(description['type'], str) variable = description['type'] if is_type_string else description['type'].pop('value') if entity is not None: variable_types = find_variable_types() variable_class = variable_types.get(variable, variable_types.get('unknown')) kwargs = {} if is_type_string else description['type'] variable = variable_class(description['id'], entity, **kwargs) interesting_values = pd.read_json(description['properties']['interesting_values'], typ='series') variable.interesting_values = interesting_values variable_description = description['properties'].get('description') if variable_description is not None and variable_description != 'the "{}"'.format(variable.name): variable.description = variable_description return variable def description_to_entity(description, entityset, path=None): '''Deserialize entity from entity description and add to entityset. Args: description (dict) : Description of :class:`.Entity`. entityset (EntitySet) : Instance of :class:`.EntitySet` to add :class:`.Entity`. path (str) : Root directory to serialized entityset. ''' if path: dataframe = read_entity_data(description, path=path) else: dataframe = empty_dataframe(description) variable_types = {variable['id']: (description_to_variable(variable), variable) for variable in description['variables']} es = entityset.entity_from_dataframe( description['id'], dataframe, index=description.get('index'), time_index=description.get('time_index'), secondary_time_index=description['properties'].get('secondary_time_index'), variable_types={variable: variable_types[variable][0] for variable in variable_types}) for variable in es[description['id']].variables: interesting_values = variable_types[variable.id][1]['properties']['interesting_values'] interesting_values = pd.read_json(interesting_values, typ="series") variable.interesting_values = interesting_values variable_description = variable_types[variable.id][1]['properties'].get('description') if variable_description is not None and variable_description != 'the "{}"'.format(variable.name): variable.description = variable_description def description_to_entityset(description, **kwargs): '''Deserialize entityset from data description. Args: description (dict) : Description of an :class:`.EntitySet`. Likely generated using :meth:`.serialize.entityset_to_description` kwargs (keywords): Additional keyword arguments to pass as keywords arguments to the underlying deserialization method. Returns: entityset (EntitySet) : Instance of :class:`.EntitySet`. ''' check_schema_version(description, 'entityset') from featuretools.entityset import EntitySet # If data description was not read from disk, path is None. path = description.get('path') entityset = EntitySet(description['id']) last_time_index = [] for entity in description['entities'].values(): entity['loading_info']['params'].update(kwargs) # If path is None, an empty dataframe will be created for entity. description_to_entity(entity, entityset, path=path) if entity['properties']['last_time_index']: last_time_index.append(entity['id']) for relationship in description['relationships']: relationship = Relationship.from_dictionary(relationship, entityset) entityset.add_relationship(relationship) if len(last_time_index): entityset.add_last_time_indexes(updated_entities=last_time_index) return entityset def empty_dataframe(description): '''Deserialize empty dataframe from entity description. Args: description (dict) : Description of :class:`.Entity`. Returns: df (DataFrame) : Empty dataframe for entity. ''' columns = [variable['id'] for variable in description['variables']] dtypes = description['loading_info']['properties']['dtypes'] return pd.DataFrame(columns=columns).astype(dtypes) def read_entity_data(description, path): '''Read description data from disk. Args: description (dict) : Description of :class:`.Entity`. path (str): Location on disk to read entity data. Returns: df (DataFrame) : Instance of dataframe. ''' file = os.path.join(path, description['loading_info']['location']) kwargs = description['loading_info'].get('params', {}) load_format = description['loading_info']['type'] entity_type = description['loading_info'].get('entity_type', 'pandas') dtypes = description['loading_info'].get('properties', {}).get('dtypes') read_kwargs = {} if entity_type == 'dask': lib = dd read_kwargs['dtype'] = dtypes elif entity_type == 'koalas': import_error = 'Cannot load Koalas entityset - unable to import Koalas. ' \ 'Consider doing a pip install with featuretools[koalas] to install Koalas with pip' lib = import_or_raise('databricks.koalas', import_error) read_kwargs['multiline'] = True kwargs['compression'] = str(kwargs['compression']) else: lib = pd if load_format == 'csv': dataframe = lib.read_csv( file, engine=kwargs['engine'], compression=kwargs['compression'], encoding=kwargs['encoding'], **read_kwargs ) elif load_format == 'parquet': dataframe = lib.read_parquet(file, engine=kwargs['engine']) elif load_format == 'pickle': dataframe = pd.read_pickle(file, **kwargs) else: error = 'must be one of the following formats: {}' raise ValueError(error.format(', '.join(FORMATS))) if entity_type == 'koalas': for col, dtype in dtypes.items(): if dtype == 'object': dtypes[col] = 'str' if dtype == 'datetime64[ns]': dtypes[col] = np.datetime64 dataframe = dataframe.astype(dtypes) if load_format in ['parquet', 'csv']: latlongs = [] for var_description in description['variables']: if var_description['type']['value'] == LatLong.type_string: latlongs.append(var_description["id"]) def parse_latlong_tuple(x): return tuple(float(y) for y in x[1:-1].split(",")) def parse_latlong_list(x): return list(float(y) for y in x[1:-1].split(",")) for column in latlongs: if entity_type == 'dask': meta = (column, tuple([float, float])) dataframe[column] = dataframe[column].apply(parse_latlong_tuple, meta=meta) elif entity_type == 'koalas': dataframe[column] = dataframe[column].apply(parse_latlong_list) else: dataframe[column] = dataframe[column].apply(parse_latlong_tuple) return dataframe def read_data_description(path): '''Read data description from disk, S3 path, or URL. Args: path (str): Location on disk, S3 path, or URL to read `data_description.json`. Returns: description (dict) : Description of :class:`.EntitySet`. ''' path = os.path.abspath(path) assert os.path.exists(path), '"{}" does not exist'.format(path) filepath = os.path.join(path, 'data_description.json') with open(filepath, 'r') as file: description = json.load(file) description['path'] = path return description [docs]def read_entityset(path, profile_name=None, **kwargs): '''Read entityset from disk, S3 path, or URL. Args: path (str): Directory on disk, S3 path, or URL to read `data_description.json`. profile_name (str, bool): The AWS profile specified to write to S3. Will default to None and search for AWS credentials. Set to False to use an anonymous profile. kwargs (keywords): Additional keyword arguments to pass as keyword arguments to the underlying deserialization method. ''' if _is_url(path) or _is_s3(path) or _is_local_tar(str(path)): with tempfile.TemporaryDirectory() as tmpdir: local_path = path transport_params = None if _is_s3(path): transport_params = get_transport_params(profile_name) if _is_s3(path) or _is_url(path): local_path = os.path.join(tmpdir, "temporary_es") use_smartopen_es(local_path, path, transport_params) with tarfile.open(str(local_path)) as tar: tar.extractall(path=tmpdir) data_description = read_data_description(tmpdir) return description_to_entityset(data_description, **kwargs) else: data_description = read_data_description(path) return description_to_entityset(data_description, **kwargs)