Source code for featuretools.entityset.deserialize

import json
import os
import tarfile
import tempfile
from pathlib import Path

import dask.dataframe as dd
import numpy as np
import pandas as pd

from featuretools.entityset.relationship import Relationship
from featuretools.entityset.serialize import FORMATS
from featuretools.utils.gen_utils import check_schema_version, import_or_raise
from featuretools.utils.s3_utils import get_transport_params, use_smartopen_es
from featuretools.utils.wrangle import _is_s3, _is_url
from featuretools.variable_types import LatLong, find_variable_types


def description_to_variable(description, entity=None):
    '''Deserialize variable from variable description.

    Args:
        description (dict) : Description of :class:`.Variable`.
        entity (Entity) : Instance of :class:`.Entity` to add :class:`.Variable`. If entity is None, :class:`.Variable` will not be instantiated.

    Returns:
        variable (Variable) : Returns :class:`.Variable`.
    '''
    is_type_string = isinstance(description['type'], str)
    variable = description['type'] if is_type_string else description['type'].pop('value')
    if entity is not None:
        variable_types = find_variable_types()
        variable_class = variable_types.get(variable, variable_types.get('unknown'))
        kwargs = {} if is_type_string else description['type']
        variable = variable_class(description['id'], entity, **kwargs)
        interesting_values = pd.read_json(description['properties']['interesting_values'], typ='series')
        variable.interesting_values = interesting_values
    return variable


def description_to_entity(description, entityset, path=None):
    '''Deserialize entity from entity description and add to entityset.

    Args:
        description (dict) : Description of :class:`.Entity`.
        entityset (EntitySet) : Instance of :class:`.EntitySet` to add :class:`.Entity`.
        path (str) : Root directory to serialized entityset.
    '''
    if path:
        dataframe = read_entity_data(description, path=path)
    else:
        dataframe = empty_dataframe(description)
    variable_types = {variable['id']: (description_to_variable(variable), variable)
                      for variable in description['variables']}
    es = entityset.entity_from_dataframe(
        description['id'],
        dataframe,
        index=description.get('index'),
        time_index=description.get('time_index'),
        secondary_time_index=description['properties'].get('secondary_time_index'),
        variable_types={variable: variable_types[variable][0] for variable in variable_types})
    for variable in es[description['id']].variables:
        interesting_values = variable_types[variable.id][1]['properties']['interesting_values']
        interesting_values = pd.read_json(interesting_values, typ="series")
        variable.interesting_values = interesting_values


def description_to_entityset(description, **kwargs):
    '''Deserialize entityset from data description.

    Args:
        description (dict) : Description of an :class:`.EntitySet`. Likely generated using :meth:`.serialize.entityset_to_description`
        kwargs (keywords): Additional keyword arguments to pass as keywords arguments to the underlying deserialization method.

    Returns:
        entityset (EntitySet) : Instance of :class:`.EntitySet`.
    '''
    check_schema_version(description, 'entityset')

    from featuretools.entityset import EntitySet

    # If data description was not read from disk, path is None.
    path = description.get('path')
    entityset = EntitySet(description['id'])

    last_time_index = []
    for entity in description['entities'].values():
        entity['loading_info']['params'].update(kwargs)
        # If path is None, an empty dataframe will be created for entity.
        description_to_entity(entity, entityset, path=path)
        if entity['properties']['last_time_index']:
            last_time_index.append(entity['id'])

    for relationship in description['relationships']:
        relationship = Relationship.from_dictionary(relationship, entityset)
        entityset.add_relationship(relationship)

    if len(last_time_index):
        entityset.add_last_time_indexes(updated_entities=last_time_index)

    return entityset


def empty_dataframe(description):
    '''Deserialize empty dataframe from entity description.

    Args:
        description (dict) : Description of :class:`.Entity`.

    Returns:
        df (DataFrame) : Empty dataframe for entity.
    '''
    columns = [variable['id'] for variable in description['variables']]
    dtypes = description['loading_info']['properties']['dtypes']
    return pd.DataFrame(columns=columns).astype(dtypes)


def read_entity_data(description, path):
    '''Read description data from disk.

    Args:
        description (dict) : Description of :class:`.Entity`.
        path (str): Location on disk to read entity data.

    Returns:
        df (DataFrame) : Instance of dataframe.
    '''
    file = os.path.join(path, description['loading_info']['location'])
    kwargs = description['loading_info'].get('params', {})
    load_format = description['loading_info']['type']
    entity_type = description['loading_info'].get('entity_type', 'pandas')
    read_kwargs = {}
    if entity_type == 'dask':
        lib = dd
    elif entity_type == 'koalas':
        import_error = 'Cannot load Koalas entityset - unable to import Koalas. ' \
                       'Consider doing a pip install with featuretools[koalas] to install Koalas with pip'
        lib = import_or_raise('databricks.koalas', import_error)
        read_kwargs['multiline'] = True
        kwargs['compression'] = str(kwargs['compression'])
    else:
        lib = pd
    if load_format == 'csv':
        dataframe = lib.read_csv(
            file,
            engine=kwargs['engine'],
            compression=kwargs['compression'],
            encoding=kwargs['encoding'],
            **read_kwargs
        )
    elif load_format == 'parquet':
        dataframe = lib.read_parquet(file, engine=kwargs['engine'])
    elif load_format == 'pickle':
        dataframe = pd.read_pickle(file, **kwargs)
    else:
        error = 'must be one of the following formats: {}'
        raise ValueError(error.format(', '.join(FORMATS)))
    dtypes = description['loading_info']['properties']['dtypes']
    if entity_type == 'koalas':
        for col, dtype in dtypes.items():
            if dtype == 'object':
                dtypes[col] = 'string'
            if dtype == 'datetime64[ns]':
                dtypes[col] = np.datetime64
    dataframe = dataframe.astype(dtypes)

    if load_format in ['parquet', 'csv']:
        latlongs = []
        for var_description in description['variables']:
            if var_description['type']['value'] == LatLong.type_string:
                latlongs.append(var_description["id"])

        def parse_latlong_tuple(x):
            return tuple(float(y) for y in x[1:-1].split(","))

        def parse_latlong_list(x):
            return list(float(y) for y in x[1:-1].split(","))

        for column in latlongs:
            if entity_type == 'dask':
                meta = (column, tuple([float, float]))
                dataframe[column] = dataframe[column].apply(parse_latlong_tuple,
                                                            meta=meta)
            elif entity_type == 'koalas':
                dataframe[column] = dataframe[column].apply(parse_latlong_list)

            else:
                dataframe[column] = dataframe[column].apply(parse_latlong_tuple)

    return dataframe


def read_data_description(path):
    '''Read data description from disk, S3 path, or URL.

        Args:
            path (str): Location on disk, S3 path, or URL to read `data_description.json`.

        Returns:
            description (dict) : Description of :class:`.EntitySet`.
    '''

    path = os.path.abspath(path)
    assert os.path.exists(path), '"{}" does not exist'.format(path)
    file = os.path.join(path, 'data_description.json')
    with open(file, 'r') as file:
        description = json.load(file)
    description['path'] = path
    return description


[docs]def read_entityset(path, profile_name=None, **kwargs): '''Read entityset from disk, S3 path, or URL. Args: path (str): Directory on disk, S3 path, or URL to read `data_description.json`. profile_name (str, bool): The AWS profile specified to write to S3. Will default to None and search for AWS credentials. Set to False to use an anonymous profile. kwargs (keywords): Additional keyword arguments to pass as keyword arguments to the underlying deserialization method. ''' if _is_url(path) or _is_s3(path): with tempfile.TemporaryDirectory() as tmpdir: file_name = Path(path).name file_path = os.path.join(tmpdir, file_name) transport_params = None if _is_s3(path): transport_params = get_transport_params(profile_name) use_smartopen_es(file_path, path, transport_params) with tarfile.open(str(file_path)) as tar: tar.extractall(path=tmpdir) data_description = read_data_description(tmpdir) return description_to_entityset(data_description, **kwargs) else: data_description = read_data_description(path) return description_to_entityset(data_description, **kwargs)