import logging import os import warnings from functools import wraps import dask.dataframe as dd import numpy as np import pandas as pd import psutil from woodwork.logical_types import Datetime, Double from featuretools.entityset.relationship import RelationshipPath from featuretools.feature_base import AggregationFeature, DirectFeature from featuretools.utils import Trie from featuretools.utils.gen_utils import Library from featuretools.utils.wrangle import _check_time_type, _check_timedelta logger = logging.getLogger('featuretools.computational_backend') def bin_cutoff_times(cutoff_time, bin_size): binned_cutoff_time = cutoff_time.ww.copy() if type(bin_size) == int: binned_cutoff_time['time'] = binned_cutoff_time['time'].apply(lambda x: x / bin_size * bin_size) else: bin_size = _check_timedelta(bin_size) binned_cutoff_time['time'] = datetime_round(binned_cutoff_time['time'], bin_size) return binned_cutoff_time def save_csv_decorator(save_progress=None): def inner_decorator(method): @wraps(method) def wrapped(*args, **kwargs): if save_progress is None: r = method(*args, **kwargs) else: time = args[0].to_pydatetime() file_name = 'ft_' + time.strftime("%Y_%m_%d_%I-%M-%S-%f") + '.csv' file_path = os.path.join(save_progress, file_name) temp_dir = os.path.join(save_progress, 'temp') if not os.path.exists(temp_dir): os.makedirs(temp_dir) temp_file_path = os.path.join(temp_dir, file_name) r = method(*args, **kwargs) r.to_csv(temp_file_path) os.rename(temp_file_path, file_path) return r return wrapped return inner_decorator def datetime_round(dt, freq): """ round down Timestamp series to a specified freq """ if not freq.is_absolute(): raise ValueError("Unit is relative") # TODO: multitemporal units all_units = list(freq.times.keys()) if len(all_units) == 1: unit = all_units[0] value = freq.times[unit] if unit == 'm': unit = 't' # No support for weeks in datetime.datetime if unit == 'w': unit = 'd' value = value * 7 freq = str(value) + unit return dt.dt.floor(freq) else: assert "Frequency cannot have multiple temporal parameters" def gather_approximate_features(feature_set): """ Find features which can be approximated. Returned as a trie where the values are sets of feature names. Args: feature_set (FeatureSet): Features to search the dependencies of for features to approximate. Returns: Trie[RelationshipPath, set[str]] """ approximate_feature_trie = Trie(default=set, path_constructor=RelationshipPath) for feature in feature_set.target_features: if feature_set.uses_full_dataframe(feature, check_dependents=True): continue if isinstance(feature, DirectFeature): path = feature.relationship_path base_feature = feature.base_features[0] while isinstance(base_feature, DirectFeature): path = path + base_feature.relationship_path base_feature = base_feature.base_features[0] if isinstance(base_feature, AggregationFeature): node_feature_set = approximate_feature_trie.get_node(path).value node_feature_set.add(base_feature.unique_name()) return approximate_feature_trie def gen_empty_approx_features_df(approx_features): df = pd.DataFrame(columns=[f.get_name() for f in approx_features]) df.index.name = approx_features[0].dataframe.ww.index return df def n_jobs_to_workers(n_jobs): try: cpus = len(psutil.Process().cpu_affinity()) except AttributeError: cpus = psutil.cpu_count() # Taken from sklearn parallel_backends code # https://github.com/scikit-learn/scikit-learn/blob/27bbdb570bac062c71b3bb21b0876fd78adc9f7e/sklearn/externals/joblib/_parallel_backends.py#L120 if n_jobs < 0: workers = max(cpus + 1 + n_jobs, 1) else: workers = min(n_jobs, cpus) assert workers > 0, "Need at least one worker" return workers def create_client_and_cluster(n_jobs, dask_kwargs, entityset_size): Client, LocalCluster = get_client_cluster() cluster = None if 'cluster' in dask_kwargs: cluster = dask_kwargs['cluster'] else: # diagnostics_port sets the default port to launch bokeh web interface # if it is set to None web interface will not be launched diagnostics_port = None if 'diagnostics_port' in dask_kwargs: diagnostics_port = dask_kwargs['diagnostics_port'] del dask_kwargs['diagnostics_port'] workers = n_jobs_to_workers(n_jobs) if n_jobs != -1 and workers < n_jobs: warning_string = "{} workers requested, but only {} workers created." warning_string = warning_string.format(n_jobs, workers) warnings.warn(warning_string) # Distributed default memory_limit for worker is 'auto'. It calculates worker # memory limit as total virtual memory divided by the number # of cores available to the workers (alwasy 1 for featuretools setup). # This means reducing the number of workers does not increase the memory # limit for other workers. Featuretools default is to calculate memory limit # as total virtual memory divided by number of workers. To use distributed # default memory limit, set dask_kwargs['memory_limit']='auto' if 'memory_limit' in dask_kwargs: memory_limit = dask_kwargs['memory_limit'] del dask_kwargs['memory_limit'] else: total_memory = psutil.virtual_memory().total memory_limit = int(total_memory / float(workers)) cluster = LocalCluster(n_workers=workers, threads_per_worker=1, diagnostics_port=diagnostics_port, memory_limit=memory_limit, **dask_kwargs) # if cluster has bokeh port, notify user if unexpected port number if diagnostics_port is not None: if hasattr(cluster, 'scheduler') and cluster.scheduler: info = cluster.scheduler.identity() if 'bokeh' in info['services']: msg = "Dashboard started on port {}" print(msg.format(info['services']['bokeh'])) client = Client(cluster) warned_of_memory = False for worker in list(client.scheduler_info()['workers'].values()): worker_limit = worker['memory_limit'] if worker_limit < entityset_size: raise ValueError("Insufficient memory to use this many workers") elif worker_limit < 2 * entityset_size and not warned_of_memory: logger.warning("Worker memory is between 1 to 2 times the memory" " size of the EntitySet. If errors occur that do" " not occur with n_jobs equals 1, this may be the " "cause. See https://featuretools.alteryx.com/en/stable/guides/performance.html#parallel-feature-computation" " for more information.") warned_of_memory = True return client, cluster def get_client_cluster(): """ Separated out the imports to make it easier to mock during testing """ from distributed import Client, LocalCluster return Client, LocalCluster def _validate_cutoff_time(cutoff_time, target_dataframe): """ Verify that the cutoff time is a single value or a pandas dataframe with the proper columns containing no duplicate rows """ if isinstance(cutoff_time, dd.DataFrame): msg = "cutoff_time should be a Pandas DataFrame: "\ "computing cutoff_time, this may take a while" warnings.warn(msg) cutoff_time = cutoff_time.compute() if isinstance(cutoff_time, pd.DataFrame): cutoff_time = cutoff_time.reset_index(drop=True) if "instance_id" not in cutoff_time.columns: if target_dataframe.ww.index not in cutoff_time.columns: raise AttributeError('Cutoff time DataFrame must contain a column with either the same name' ' as the target dataframe index or a column named "instance_id"') # rename to instance_id cutoff_time.rename(columns={target_dataframe.ww.index: "instance_id"}, inplace=True) if "time" not in cutoff_time.columns: if target_dataframe.ww.time_index and target_dataframe.ww.time_index not in cutoff_time.columns: raise AttributeError('Cutoff time DataFrame must contain a column with either the same name' ' as the target dataframe time_index or a column named "time"') # rename to time cutoff_time.rename(columns={target_dataframe.ww.time_index: "time"}, inplace=True) # Make sure user supplies only one valid name for instance id and time columns if "instance_id" in cutoff_time.columns and target_dataframe.ww.index in cutoff_time.columns and \ "instance_id" != target_dataframe.ww.index: raise AttributeError('Cutoff time DataFrame cannot contain both a column named "instance_id" and a column' ' with the same name as the target dataframe index') if "time" in cutoff_time.columns and target_dataframe.ww.time_index in cutoff_time.columns and \ "time" != target_dataframe.ww.time_index: raise AttributeError('Cutoff time DataFrame cannot contain both a column named "time" and a column' ' with the same name as the target dataframe time index') assert (cutoff_time[['instance_id', 'time']].duplicated().sum() == 0), \ "Duplicated rows in cutoff time dataframe." else: if isinstance(cutoff_time, list): raise TypeError("cutoff_time must be a single value or DataFrame") return cutoff_time def _check_cutoff_time_type(cutoff_time, es_time_type): """ Check that the cutoff time values are of the proper type given the entityset time type """ # Check that cutoff_time time type matches entityset time type if isinstance(cutoff_time, tuple): cutoff_time_value = cutoff_time[0] time_type = _check_time_type(cutoff_time_value) is_numeric = time_type == 'numeric' is_datetime = time_type == Datetime else: cutoff_time_col = cutoff_time.ww['time'] is_numeric = cutoff_time_col.ww.schema.is_numeric is_datetime = cutoff_time_col.ww.schema.is_datetime if es_time_type == "numeric" and not is_numeric: raise TypeError("cutoff_time times must be numeric: try casting " "via pd.to_numeric()") if es_time_type == Datetime and not is_datetime: raise TypeError("cutoff_time times must be datetime type: try casting " "via pd.to_datetime()") [docs]def replace_inf_values(feature_matrix, replacement_value=np.nan, columns=None): """Replace all ``np.inf`` values in a feature matrix with the specified replacement value. Args: feature_matrix (DataFrame): DataFrame whose columns are feature names and rows are instances replacement_value (int, float, str, optional): Value with which ``np.inf`` values will be replaced columns (list[str], optional): A list specifying which columns should have values replaced. If None, values will be replaced for all columns. Returns: feature_matrix """ if columns is None: feature_matrix = feature_matrix.replace([np.inf, -np.inf], replacement_value) else: feature_matrix[columns] = feature_matrix[columns].replace([np.inf, -np.inf], replacement_value) return feature_matrix def get_ww_types_from_features(features, entityset, pass_columns=None, cutoff_time=None): '''Given a list of features and entityset (and optionally a list of pass through columns and the cutoff time dataframe), returns the logical types, semantic tags,and origin of each column in the feature matrix. Both pass_columns and cutoff_time will need to be supplied in order to get the type information for the pass through columns ''' if pass_columns is None: pass_columns = [] logical_types = {} semantic_tags = {} origins = {} for feature in features: names = feature.get_feature_names() for name in names: logical_types[name] = feature.column_schema.logical_type semantic_tags[name] = feature.column_schema.semantic_tags.copy() semantic_tags[name] -= {'index', 'time_index'} if logical_types[name] is None and "numeric" in semantic_tags[name]: logical_types[name] = Double if all([f.primitive is None for f in feature.get_dependencies(deep=True)]): origins[name] = "base" else: origins[name] = "engineered" if pass_columns: cutoff_schema = cutoff_time.ww.schema for column in pass_columns: logical_types[column] = cutoff_schema.logical_types[column] semantic_tags[column] = cutoff_schema.semantic_tags[column] origins[column] = "base" if entityset.dataframe_type in (Library.DASK.value, Library.KOALAS.value): target_dataframe_name = features[0].dataframe_name table_schema = entityset[target_dataframe_name].ww.schema index_col = table_schema.index logical_types[index_col] = table_schema.logical_types[index_col] semantic_tags[index_col] = table_schema.semantic_tags[index_col] semantic_tags[index_col] -= {"index"} origins[index_col] = "base" ww_init = { "logical_types": logical_types, "semantic_tags": semantic_tags, "column_origins": origins } return ww_init