import json import os import tarfile import tempfile import dask.dataframe as dd import numpy as np import pandas as pd from featuretools.entityset.relationship import Relationship from featuretools.entityset.serialize import FORMATS from featuretools.utils.gen_utils import check_schema_version, import_or_raise from featuretools.utils.s3_utils import get_transport_params, use_smartopen_es from featuretools.utils.wrangle import _is_local_tar, _is_s3, _is_url from featuretools.variable_types import LatLong, find_variable_types def description_to_variable(description, entity=None): '''Deserialize variable from variable description. Args: description (dict) : Description of :class:`.Variable`. entity (Entity) : Instance of :class:`.Entity` to add :class:`.Variable`. If entity is None, :class:`.Variable` will not be instantiated. Returns: variable (Variable) : Returns :class:`.Variable`. ''' is_type_string = isinstance(description['type'], str) variable = description['type'] if is_type_string else description['type'].pop('value') if entity is not None: variable_types = find_variable_types() variable_class = variable_types.get(variable, variable_types.get('unknown')) kwargs = {} if is_type_string else description['type'] variable = variable_class(description['id'], entity, **kwargs) interesting_values = pd.read_json(description['properties']['interesting_values'], typ='series') variable.interesting_values = interesting_values variable_description = description['properties'].get('description') if variable_description is not None and variable_description != 'the "{}"'.format(variable.name): variable.description = variable_description return variable def description_to_entity(description, entityset, path=None): '''Deserialize entity from entity description and add to entityset. Args: description (dict) : Description of :class:`.Entity`. entityset (EntitySet) : Instance of :class:`.EntitySet` to add :class:`.Entity`. path (str) : Root directory to serialized entityset. ''' if path: dataframe = read_entity_data(description, path=path) else: dataframe = empty_dataframe(description) variable_types = {variable['id']: (description_to_variable(variable), variable) for variable in description['variables']} es = entityset.entity_from_dataframe( description['id'], dataframe, index=description.get('index'), time_index=description.get('time_index'), secondary_time_index=description['properties'].get('secondary_time_index'), variable_types={variable: variable_types[variable][0] for variable in variable_types}) for variable in es[description['id']].variables: interesting_values = variable_types[variable.id][1]['properties']['interesting_values'] interesting_values = pd.read_json(interesting_values, typ="series") variable.interesting_values = interesting_values variable_description = variable_types[variable.id][1]['properties'].get('description') if variable_description is not None and variable_description != 'the "{}"'.format(variable.name): variable.description = variable_description def description_to_entityset(description, **kwargs): '''Deserialize entityset from data description. Args: description (dict) : Description of an :class:`.EntitySet`. Likely generated using :meth:`.serialize.entityset_to_description` kwargs (keywords): Additional keyword arguments to pass as keywords arguments to the underlying deserialization method. Returns: entityset (EntitySet) : Instance of :class:`.EntitySet`. ''' check_schema_version(description, 'entityset') from featuretools.entityset import EntitySet # If data description was not read from disk, path is None. path = description.get('path') entityset = EntitySet(description['id']) last_time_index = [] for entity in description['entities'].values(): entity['loading_info']['params'].update(kwargs) # If path is None, an empty dataframe will be created for entity. description_to_entity(entity, entityset, path=path) if entity['properties']['last_time_index']: last_time_index.append(entity['id']) for relationship in description['relationships']: relationship = Relationship.from_dictionary(relationship, entityset) entityset.add_relationship(relationship) if len(last_time_index): entityset.add_last_time_indexes(updated_entities=last_time_index) return entityset def empty_dataframe(description): '''Deserialize empty dataframe from entity description. Args: description (dict) : Description of :class:`.Entity`. Returns: df (DataFrame) : Empty dataframe for entity. ''' columns = [variable['id'] for variable in description['variables']] dtypes = description['loading_info']['properties']['dtypes'] return pd.DataFrame(columns=columns).astype(dtypes) def read_entity_data(description, path): '''Read description data from disk. Args: description (dict) : Description of :class:`.Entity`. path (str): Location on disk to read entity data. Returns: df (DataFrame) : Instance of dataframe. ''' file = os.path.join(path, description['loading_info']['location']) kwargs = description['loading_info'].get('params', {}) load_format = description['loading_info']['type'] entity_type = description['loading_info'].get('entity_type', 'pandas') dtypes = description['loading_info'].get('properties', {}).get('dtypes') read_kwargs = {} if entity_type == 'dask': lib = dd read_kwargs['dtype'] = dtypes elif entity_type == 'koalas': import_error = 'Cannot load Koalas entityset - unable to import Koalas. ' \ 'Consider doing a pip install with featuretools[koalas] to install Koalas with pip' lib = import_or_raise('databricks.koalas', import_error) read_kwargs['multiline'] = True kwargs['compression'] = str(kwargs['compression']) else: lib = pd if load_format == 'csv': dataframe = lib.read_csv( file, engine=kwargs['engine'], compression=kwargs['compression'], encoding=kwargs['encoding'], **read_kwargs ) elif load_format == 'parquet': dataframe = lib.read_parquet(file, engine=kwargs['engine']) elif load_format == 'pickle': dataframe = pd.read_pickle(file, **kwargs) else: error = 'must be one of the following formats: {}' raise ValueError(error.format(', '.join(FORMATS))) if entity_type == 'koalas': for col, dtype in dtypes.items(): if dtype == 'object': dtypes[col] = 'str' if dtype == 'datetime64[ns]': dtypes[col] = np.datetime64 dataframe = dataframe.astype(dtypes) if load_format in ['parquet', 'csv']: latlongs = [] for var_description in description['variables']: if var_description['type']['value'] == LatLong.type_string: latlongs.append(var_description["id"]) def parse_latlong_tuple(x): return tuple(float(y) for y in x[1:-1].split(",")) def parse_latlong_list(x): return list(float(y) for y in x[1:-1].split(",")) for column in latlongs: if entity_type == 'dask': meta = (column, tuple([float, float])) dataframe[column] = dataframe[column].apply(parse_latlong_tuple, meta=meta) elif entity_type == 'koalas': dataframe[column] = dataframe[column].apply(parse_latlong_list) else: dataframe[column] = dataframe[column].apply(parse_latlong_tuple) return dataframe def read_data_description(path): '''Read data description from disk, S3 path, or URL. Args: path (str): Location on disk, S3 path, or URL to read `data_description.json`. Returns: description (dict) : Description of :class:`.EntitySet`. ''' path = os.path.abspath(path) assert os.path.exists(path), '"{}" does not exist'.format(path) filepath = os.path.join(path, 'data_description.json') with open(filepath, 'r') as file: description = json.load(file) description['path'] = path return description [docs]def read_entityset(path, profile_name=None, **kwargs): '''Read entityset from disk, S3 path, or URL. Args: path (str): Directory on disk, S3 path, or URL to read `data_description.json`. profile_name (str, bool): The AWS profile specified to write to S3. Will default to None and search for AWS credentials. Set to False to use an anonymous profile. kwargs (keywords): Additional keyword arguments to pass as keyword arguments to the underlying deserialization method. ''' if _is_url(path) or _is_s3(path) or _is_local_tar(str(path)): with tempfile.TemporaryDirectory() as tmpdir: local_path = path transport_params = None if _is_s3(path): transport_params = get_transport_params(profile_name) if _is_s3(path) or _is_url(path): local_path = os.path.join(tmpdir, "temporary_es") use_smartopen_es(local_path, path, transport_params) with tarfile.open(str(local_path)) as tar: tar.extractall(path=tmpdir) data_description = read_data_description(tmpdir) return description_to_entityset(data_description, **kwargs) else: data_description = read_data_description(path) return description_to_entityset(data_description, **kwargs)