Source code for featuretools.computational_backends.utils

import logging
import os
import warnings
from functools import wraps

import dask.dataframe as dd
import numpy as np
import pandas as pd
import psutil
from woodwork.logical_types import Datetime, Double

from featuretools.entityset.relationship import RelationshipPath
from featuretools.feature_base import AggregationFeature, DirectFeature
from featuretools.utils import Trie
from featuretools.utils.gen_utils import Library
from featuretools.utils.wrangle import _check_time_type, _check_timedelta

logger = logging.getLogger("featuretools.computational_backend")


def bin_cutoff_times(cutoff_time, bin_size):
    binned_cutoff_time = cutoff_time.ww.copy()
    if type(bin_size) == int:
        binned_cutoff_time["time"] = binned_cutoff_time["time"].apply(
            lambda x: x / bin_size * bin_size
        )
    else:
        bin_size = _check_timedelta(bin_size)
        binned_cutoff_time["time"] = datetime_round(
            binned_cutoff_time["time"], bin_size
        )
    return binned_cutoff_time


def save_csv_decorator(save_progress=None):
    def inner_decorator(method):
        @wraps(method)
        def wrapped(*args, **kwargs):
            if save_progress is None:
                r = method(*args, **kwargs)
            else:
                time = args[0].to_pydatetime()
                file_name = "ft_" + time.strftime("%Y_%m_%d_%I-%M-%S-%f") + ".csv"
                file_path = os.path.join(save_progress, file_name)
                temp_dir = os.path.join(save_progress, "temp")
                if not os.path.exists(temp_dir):
                    os.makedirs(temp_dir)
                temp_file_path = os.path.join(temp_dir, file_name)
                r = method(*args, **kwargs)
                r.to_csv(temp_file_path)
                os.rename(temp_file_path, file_path)
            return r

        return wrapped

    return inner_decorator


def datetime_round(dt, freq):
    """
    round down Timestamp series to a specified freq
    """
    if not freq.is_absolute():
        raise ValueError("Unit is relative")

    # TODO: multitemporal units
    all_units = list(freq.times.keys())
    if len(all_units) == 1:
        unit = all_units[0]
        value = freq.times[unit]
        if unit == "m":
            unit = "t"
        # No support for weeks in datetime.datetime
        if unit == "w":
            unit = "d"
            value = value * 7
        freq = str(value) + unit
        return dt.dt.floor(freq)
    else:
        assert "Frequency cannot have multiple temporal parameters"


def gather_approximate_features(feature_set):
    """
    Find features which can be approximated. Returned as a trie where the values
    are sets of feature names.

    Args:
        feature_set (FeatureSet): Features to search the dependencies of for
            features to approximate.

    Returns:
        Trie[RelationshipPath, set[str]]
    """
    approximate_feature_trie = Trie(default=set, path_constructor=RelationshipPath)

    for feature in feature_set.target_features:
        if feature_set.uses_full_dataframe(feature, check_dependents=True):
            continue

        if isinstance(feature, DirectFeature):
            path = feature.relationship_path
            base_feature = feature.base_features[0]

            while isinstance(base_feature, DirectFeature):
                path = path + base_feature.relationship_path
                base_feature = base_feature.base_features[0]

            if isinstance(base_feature, AggregationFeature):
                node_feature_set = approximate_feature_trie.get_node(path).value
                node_feature_set.add(base_feature.unique_name())

    return approximate_feature_trie


def gen_empty_approx_features_df(approx_features):
    df = pd.DataFrame(columns=[f.get_name() for f in approx_features])
    df.index.name = approx_features[0].dataframe.ww.index
    return df


def n_jobs_to_workers(n_jobs):
    try:
        cpus = len(psutil.Process().cpu_affinity())
    except AttributeError:
        cpus = psutil.cpu_count()

    # Taken from sklearn parallel_backends code
    # https://github.com/scikit-learn/scikit-learn/blob/27bbdb570bac062c71b3bb21b0876fd78adc9f7e/sklearn/externals/joblib/_parallel_backends.py#L120
    if n_jobs < 0:
        workers = max(cpus + 1 + n_jobs, 1)
    else:
        workers = min(n_jobs, cpus)

    assert workers > 0, "Need at least one worker"
    return workers


def create_client_and_cluster(n_jobs, dask_kwargs, entityset_size):
    Client, LocalCluster = get_client_cluster()

    cluster = None
    if "cluster" in dask_kwargs:
        cluster = dask_kwargs["cluster"]
    else:
        # diagnostics_port sets the default port to launch bokeh web interface
        # if it is set to None web interface will not be launched
        diagnostics_port = None
        if "diagnostics_port" in dask_kwargs:
            diagnostics_port = dask_kwargs["diagnostics_port"]
            del dask_kwargs["diagnostics_port"]

        workers = n_jobs_to_workers(n_jobs)
        if n_jobs != -1 and workers < n_jobs:
            warning_string = "{} workers requested, but only {} workers created."
            warning_string = warning_string.format(n_jobs, workers)
            warnings.warn(warning_string)

        # Distributed default memory_limit for worker is 'auto'. It calculates worker
        # memory limit as total virtual memory divided by the number
        # of cores available to the workers (alwasy 1 for featuretools setup).
        # This means reducing the number of workers does not increase the memory
        # limit for other workers.  Featuretools default is to calculate memory limit
        # as total virtual memory divided by number of workers. To use distributed
        # default memory limit, set dask_kwargs['memory_limit']='auto'
        if "memory_limit" in dask_kwargs:
            memory_limit = dask_kwargs["memory_limit"]
            del dask_kwargs["memory_limit"]
        else:
            total_memory = psutil.virtual_memory().total
            memory_limit = int(total_memory / float(workers))

        cluster = LocalCluster(
            n_workers=workers,
            threads_per_worker=1,
            diagnostics_port=diagnostics_port,
            memory_limit=memory_limit,
            **dask_kwargs,
        )

        # if cluster has bokeh port, notify user if unexpected port number
        if diagnostics_port is not None:
            if hasattr(cluster, "scheduler") and cluster.scheduler:
                info = cluster.scheduler.identity()
                if "bokeh" in info["services"]:
                    msg = "Dashboard started on port {}"
                    print(msg.format(info["services"]["bokeh"]))

    client = Client(cluster)

    warned_of_memory = False
    for worker in list(client.scheduler_info()["workers"].values()):
        worker_limit = worker["memory_limit"]
        if worker_limit < entityset_size:
            raise ValueError("Insufficient memory to use this many workers")
        elif worker_limit < 2 * entityset_size and not warned_of_memory:
            logger.warning(
                "Worker memory is between 1 to 2 times the memory"
                " size of the EntitySet. If errors occur that do"
                " not occur with n_jobs equals 1, this may be the "
                "cause.  See https://featuretools.alteryx.com/en/stable/guides/performance.html#parallel-feature-computation"
                " for more information."
            )
            warned_of_memory = True

    return client, cluster


def get_client_cluster():
    """
    Separated out the imports to make it easier to mock during testing
    """
    from distributed import Client, LocalCluster

    return Client, LocalCluster


def _validate_cutoff_time(cutoff_time, target_dataframe):
    """
    Verify that the cutoff time is a single value or a pandas dataframe with the proper columns
    containing no duplicate rows
    """
    if isinstance(cutoff_time, dd.DataFrame):
        msg = (
            "cutoff_time should be a Pandas DataFrame: "
            "computing cutoff_time, this may take a while"
        )
        warnings.warn(msg)
        cutoff_time = cutoff_time.compute()

    if isinstance(cutoff_time, pd.DataFrame):
        cutoff_time = cutoff_time.reset_index(drop=True)

        if "instance_id" not in cutoff_time.columns:
            if target_dataframe.ww.index not in cutoff_time.columns:
                raise AttributeError(
                    "Cutoff time DataFrame must contain a column with either the same name"
                    ' as the target dataframe index or a column named "instance_id"'
                )
            # rename to instance_id
            cutoff_time.rename(
                columns={target_dataframe.ww.index: "instance_id"}, inplace=True
            )

        if "time" not in cutoff_time.columns:
            if (
                target_dataframe.ww.time_index
                and target_dataframe.ww.time_index not in cutoff_time.columns
            ):
                raise AttributeError(
                    "Cutoff time DataFrame must contain a column with either the same name"
                    ' as the target dataframe time_index or a column named "time"'
                )
            # rename to time
            cutoff_time.rename(
                columns={target_dataframe.ww.time_index: "time"}, inplace=True
            )

        # Make sure user supplies only one valid name for instance id and time columns
        if (
            "instance_id" in cutoff_time.columns
            and target_dataframe.ww.index in cutoff_time.columns
            and "instance_id" != target_dataframe.ww.index
        ):
            raise AttributeError(
                'Cutoff time DataFrame cannot contain both a column named "instance_id" and a column'
                " with the same name as the target dataframe index"
            )
        if (
            "time" in cutoff_time.columns
            and target_dataframe.ww.time_index in cutoff_time.columns
            and "time" != target_dataframe.ww.time_index
        ):
            raise AttributeError(
                'Cutoff time DataFrame cannot contain both a column named "time" and a column'
                " with the same name as the target dataframe time index"
            )

        assert (
            cutoff_time[["instance_id", "time"]].duplicated().sum() == 0
        ), "Duplicated rows in cutoff time dataframe."
    else:
        if isinstance(cutoff_time, list):
            raise TypeError("cutoff_time must be a single value or DataFrame")

    return cutoff_time


def _check_cutoff_time_type(cutoff_time, es_time_type):
    """
    Check that the cutoff time values are of the proper type given the entityset time type
    """
    # Check that cutoff_time time type matches entityset time type
    if isinstance(cutoff_time, tuple):
        cutoff_time_value = cutoff_time[0]
        time_type = _check_time_type(cutoff_time_value)
        is_numeric = time_type == "numeric"
        is_datetime = time_type == Datetime
    else:
        cutoff_time_col = cutoff_time.ww["time"]
        is_numeric = cutoff_time_col.ww.schema.is_numeric
        is_datetime = cutoff_time_col.ww.schema.is_datetime

    if es_time_type == "numeric" and not is_numeric:
        raise TypeError(
            "cutoff_time times must be numeric: try casting " "via pd.to_numeric()"
        )
    if es_time_type == Datetime and not is_datetime:
        raise TypeError(
            "cutoff_time times must be datetime type: try casting "
            "via pd.to_datetime()"
        )


[docs]def replace_inf_values(feature_matrix, replacement_value=np.nan, columns=None): """Replace all ``np.inf`` values in a feature matrix with the specified replacement value. Args: feature_matrix (DataFrame): DataFrame whose columns are feature names and rows are instances replacement_value (int, float, str, optional): Value with which ``np.inf`` values will be replaced columns (list[str], optional): A list specifying which columns should have values replaced. If None, values will be replaced for all columns. Returns: feature_matrix """ if columns is None: feature_matrix = feature_matrix.replace([np.inf, -np.inf], replacement_value) else: feature_matrix[columns] = feature_matrix[columns].replace( [np.inf, -np.inf], replacement_value ) return feature_matrix
def get_ww_types_from_features( features, entityset, pass_columns=None, cutoff_time=None ): """Given a list of features and entityset (and optionally a list of pass through columns and the cutoff time dataframe), returns the logical types, semantic tags,and origin of each column in the feature matrix. Both pass_columns and cutoff_time will need to be supplied in order to get the type information for the pass through columns """ if pass_columns is None: pass_columns = [] logical_types = {} semantic_tags = {} origins = {} for feature in features: names = feature.get_feature_names() for name in names: logical_types[name] = feature.column_schema.logical_type semantic_tags[name] = feature.column_schema.semantic_tags.copy() semantic_tags[name] -= {"index", "time_index"} if logical_types[name] is None and "numeric" in semantic_tags[name]: logical_types[name] = Double if all([f.primitive is None for f in feature.get_dependencies(deep=True)]): origins[name] = "base" else: origins[name] = "engineered" if pass_columns: cutoff_schema = cutoff_time.ww.schema for column in pass_columns: logical_types[column] = cutoff_schema.logical_types[column] semantic_tags[column] = cutoff_schema.semantic_tags[column] origins[column] = "base" if entityset.dataframe_type in (Library.DASK.value, Library.SPARK.value): target_dataframe_name = features[0].dataframe_name table_schema = entityset[target_dataframe_name].ww.schema index_col = table_schema.index logical_types[index_col] = table_schema.logical_types[index_col] semantic_tags[index_col] = table_schema.semantic_tags[index_col] semantic_tags[index_col] -= {"index"} origins[index_col] = "base" ww_init = { "logical_types": logical_types, "semantic_tags": semantic_tags, "column_origins": origins, } return ww_init