NOTICE

The upcoming release of Featuretools 1.0.0 contains several breaking changes. Users are encouraged to test this version prior to release by installing from GitHub:

pip install https://github.com/alteryx/featuretools/archive/woodwork-integration.zip

For details on migrating to the new version, refer to Transitioning to Featuretools Version 1.0. Please report any issues in the Featuretools GitHub repo or by messaging in Alteryx Open Source Slack.


Source code for featuretools.computational_backends.utils

import logging
import os
import warnings
from functools import wraps

import dask.dataframe as dd
import numpy as np
import pandas as pd
import psutil

from featuretools.entityset.relationship import RelationshipPath
from featuretools.feature_base import AggregationFeature, DirectFeature
from featuretools.utils import Trie
from featuretools.utils.wrangle import _check_time_type, _check_timedelta
from featuretools.variable_types import (
    DatetimeTimeIndex,
    NumericTimeIndex,
    PandasTypes
)

logger = logging.getLogger('featuretools.computational_backend')


def bin_cutoff_times(cutoff_time, bin_size):
    binned_cutoff_time = cutoff_time.copy()
    if type(bin_size) == int:
        binned_cutoff_time['time'] = binned_cutoff_time['time'].apply(lambda x: x / bin_size * bin_size)
    else:
        bin_size = _check_timedelta(bin_size)
        binned_cutoff_time['time'] = datetime_round(binned_cutoff_time['time'], bin_size)
    return binned_cutoff_time


def save_csv_decorator(save_progress=None):
    def inner_decorator(method):
        @wraps(method)
        def wrapped(*args, **kwargs):
            if save_progress is None:
                r = method(*args, **kwargs)
            else:
                time = args[0].to_pydatetime()
                file_name = 'ft_' + time.strftime("%Y_%m_%d_%I-%M-%S-%f") + '.csv'
                file_path = os.path.join(save_progress, file_name)
                temp_dir = os.path.join(save_progress, 'temp')
                if not os.path.exists(temp_dir):
                    os.makedirs(temp_dir)
                temp_file_path = os.path.join(temp_dir, file_name)
                r = method(*args, **kwargs)
                r.to_csv(temp_file_path)
                os.rename(temp_file_path, file_path)
            return r
        return wrapped
    return inner_decorator


def datetime_round(dt, freq):
    """
    round down Timestamp series to a specified freq
    """
    if not freq.is_absolute():
        raise ValueError("Unit is relative")

    # TODO: multitemporal units
    all_units = list(freq.times.keys())
    if len(all_units) == 1:
        unit = all_units[0]
        value = freq.times[unit]
        if unit == 'm':
            unit = 't'
        # No support for weeks in datetime.datetime
        if unit == 'w':
            unit = 'd'
            value = value * 7
        freq = str(value) + unit
        return dt.dt.floor(freq)
    else:
        assert "Frequency cannot have multiple temporal parameters"


def gather_approximate_features(feature_set):
    """
    Find features which can be approximated. Returned as a trie where the values
    are sets of feature names.

    Args:
        feature_set (FeatureSet): Features to search the dependencies of for
            features to approximate.

    Returns:
        Trie[RelationshipPath, set[str]]
    """
    approximate_feature_trie = Trie(default=set, path_constructor=RelationshipPath)

    for feature in feature_set.target_features:
        if feature_set.uses_full_entity(feature, check_dependents=True):
            continue

        if isinstance(feature, DirectFeature):
            path = feature.relationship_path
            base_feature = feature.base_features[0]

            while isinstance(base_feature, DirectFeature):
                path = path + base_feature.relationship_path
                base_feature = base_feature.base_features[0]

            if isinstance(base_feature, AggregationFeature):
                node_feature_set = approximate_feature_trie.get_node(path).value
                node_feature_set.add(base_feature.unique_name())

    return approximate_feature_trie


def gen_empty_approx_features_df(approx_features):
    df = pd.DataFrame(columns=[f.get_name() for f in approx_features])
    df.index.name = approx_features[0].entity.index
    return df


def n_jobs_to_workers(n_jobs):
    try:
        cpus = len(psutil.Process().cpu_affinity())
    except AttributeError:
        cpus = psutil.cpu_count()

    # Taken from sklearn parallel_backends code
    # https://github.com/scikit-learn/scikit-learn/blob/27bbdb570bac062c71b3bb21b0876fd78adc9f7e/sklearn/externals/joblib/_parallel_backends.py#L120
    if n_jobs < 0:
        workers = max(cpus + 1 + n_jobs, 1)
    else:
        workers = min(n_jobs, cpus)

    assert workers > 0, "Need at least one worker"
    return workers


def create_client_and_cluster(n_jobs, dask_kwargs, entityset_size):
    Client, LocalCluster = get_client_cluster()

    cluster = None
    if 'cluster' in dask_kwargs:
        cluster = dask_kwargs['cluster']
    else:
        # diagnostics_port sets the default port to launch bokeh web interface
        # if it is set to None web interface will not be launched
        diagnostics_port = None
        if 'diagnostics_port' in dask_kwargs:
            diagnostics_port = dask_kwargs['diagnostics_port']
            del dask_kwargs['diagnostics_port']

        workers = n_jobs_to_workers(n_jobs)
        if n_jobs != -1 and workers < n_jobs:
            warning_string = "{} workers requested, but only {} workers created."
            warning_string = warning_string.format(n_jobs, workers)
            warnings.warn(warning_string)

        # Distributed default memory_limit for worker is 'auto'. It calculates worker
        # memory limit as total virtual memory divided by the number
        # of cores available to the workers (alwasy 1 for featuretools setup).
        # This means reducing the number of workers does not increase the memory
        # limit for other workers.  Featuretools default is to calculate memory limit
        # as total virtual memory divided by number of workers. To use distributed
        # default memory limit, set dask_kwargs['memory_limit']='auto'
        if 'memory_limit' in dask_kwargs:
            memory_limit = dask_kwargs['memory_limit']
            del dask_kwargs['memory_limit']
        else:
            total_memory = psutil.virtual_memory().total
            memory_limit = int(total_memory / float(workers))

        cluster = LocalCluster(n_workers=workers,
                               threads_per_worker=1,
                               diagnostics_port=diagnostics_port,
                               memory_limit=memory_limit,
                               **dask_kwargs)

        # if cluster has bokeh port, notify user if unexpected port number
        if diagnostics_port is not None:
            if hasattr(cluster, 'scheduler') and cluster.scheduler:
                info = cluster.scheduler.identity()
                if 'bokeh' in info['services']:
                    msg = "Dashboard started on port {}"
                    print(msg.format(info['services']['bokeh']))

    client = Client(cluster)

    warned_of_memory = False
    for worker in list(client.scheduler_info()['workers'].values()):
        worker_limit = worker['memory_limit']
        if worker_limit < entityset_size:
            raise ValueError("Insufficient memory to use this many workers")
        elif worker_limit < 2 * entityset_size and not warned_of_memory:
            logger.warning("Worker memory is between 1 to 2 times the memory"
                           " size of the EntitySet. If errors occur that do"
                           " not occur with n_jobs equals 1, this may be the "
                           "cause.  See https://featuretools.alteryx.com/en/stable/guides/performance.html#parallel-feature-computation"
                           " for more information.")
            warned_of_memory = True

    return client, cluster


def get_client_cluster():
    """
    Separated out the imports to make it easier to mock during testing
    """
    from distributed import Client, LocalCluster
    return Client, LocalCluster


def _validate_cutoff_time(cutoff_time, target_entity):
    """
    Verify that the cutoff time is a single value or a pandas dataframe with the proper columns
    containing no duplicate rows
    """
    if isinstance(cutoff_time, dd.DataFrame):
        msg = "cutoff_time should be a Pandas DataFrame: "\
            "computing cutoff_time, this may take a while"
        warnings.warn(msg)
        cutoff_time = cutoff_time.compute()

    if isinstance(cutoff_time, pd.DataFrame):
        cutoff_time = cutoff_time.reset_index(drop=True)

        if "instance_id" not in cutoff_time.columns:
            if target_entity.index not in cutoff_time.columns:
                raise AttributeError('Cutoff time DataFrame must contain a column with either the same name'
                                     ' as the target entity index or a column named "instance_id"')
            # rename to instance_id
            cutoff_time.rename(columns={target_entity.index: "instance_id"}, inplace=True)

        if "time" not in cutoff_time.columns:
            if target_entity.time_index and target_entity.time_index not in cutoff_time.columns:
                raise AttributeError('Cutoff time DataFrame must contain a column with either the same name'
                                     ' as the target entity time_index or a column named "time"')
            # rename to time
            cutoff_time.rename(columns={target_entity.time_index: "time"}, inplace=True)

        # Make sure user supplies only one valid name for instance id and time columns
        if "instance_id" in cutoff_time.columns and target_entity.index in cutoff_time.columns and \
                "instance_id" != target_entity.index:
            raise AttributeError('Cutoff time DataFrame cannot contain both a column named "instance_id" and a column'
                                 ' with the same name as the target entity index')
        if "time" in cutoff_time.columns and target_entity.time_index in cutoff_time.columns and \
                "time" != target_entity.time_index:
            raise AttributeError('Cutoff time DataFrame cannot contain both a column named "time" and a column'
                                 ' with the same name as the target entity time index')

        assert (cutoff_time[['instance_id', 'time']].duplicated().sum() == 0), \
            "Duplicated rows in cutoff time dataframe."
    else:
        if isinstance(cutoff_time, list):
            raise TypeError("cutoff_time must be a single value or DataFrame")

    return cutoff_time


def _check_cutoff_time_type(cutoff_time, es_time_type):
    """
    Check that the cutoff time values are of the proper type given the entityset time type
    """
    # Check that cutoff_time time type matches entityset time type
    if isinstance(cutoff_time, tuple):
        cutoff_time_value = cutoff_time[0]
        time_type = _check_time_type(cutoff_time_value)
        is_numeric = time_type == NumericTimeIndex
        is_datetime = time_type == DatetimeTimeIndex
    else:
        cutoff_time_dtype = cutoff_time['time'].dtype.name
        is_numeric = cutoff_time_dtype in PandasTypes._pandas_numerics
        is_datetime = cutoff_time_dtype in PandasTypes._pandas_datetimes

    if es_time_type == NumericTimeIndex and not is_numeric:
        raise TypeError("cutoff_time times must be numeric: try casting "
                        "via pd.to_numeric()")
    if es_time_type == DatetimeTimeIndex and not is_datetime:
        raise TypeError("cutoff_time times must be datetime type: try casting "
                        "via pd.to_datetime()")


[docs]def replace_inf_values(feature_matrix, replacement_value=np.nan, columns=None): """Replace all ``np.inf`` values in a feature matrix with the specified replacement value. Args: feature_matrix (DataFrame): DataFrame whose columns are feature names and rows are instances replacement_value (int, float, str, optional): Value with which ``np.inf`` values will be replaced columns (list[str], optional): A list specifying which columns should have values replaced. If None, values will be replaced for all columns. Returns: feature_matrix """ if columns is None: feature_matrix = feature_matrix.replace([np.inf, -np.inf], replacement_value) else: feature_matrix[columns] = feature_matrix[columns].replace([np.inf, -np.inf], replacement_value) return feature_matrix