import logging
import os
import typing
import warnings
from datetime import datetime
from functools import wraps
import dask.dataframe as dd
import numpy as np
import pandas as pd
import psutil
from woodwork.logical_types import Datetime, Double
from featuretools.entityset.relationship import RelationshipPath
from featuretools.feature_base import AggregationFeature, DirectFeature
from featuretools.utils import Trie
from featuretools.utils.gen_utils import Library
from featuretools.utils.wrangle import _check_time_type, _check_timedelta
logger = logging.getLogger("featuretools.computational_backend")
def bin_cutoff_times(cutoff_time, bin_size):
binned_cutoff_time = cutoff_time.ww.copy()
if type(bin_size) == int:
binned_cutoff_time["time"] = binned_cutoff_time["time"].apply(
lambda x: x / bin_size * bin_size,
)
else:
bin_size = _check_timedelta(bin_size)
binned_cutoff_time["time"] = datetime_round(
binned_cutoff_time["time"],
bin_size,
)
return binned_cutoff_time
def save_csv_decorator(save_progress=None):
def inner_decorator(method):
@wraps(method)
def wrapped(*args, **kwargs):
if save_progress is None:
r = method(*args, **kwargs)
else:
time = args[0].to_pydatetime()
file_name = "ft_" + time.strftime("%Y_%m_%d_%I-%M-%S-%f") + ".csv"
file_path = os.path.join(save_progress, file_name)
temp_dir = os.path.join(save_progress, "temp")
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
temp_file_path = os.path.join(temp_dir, file_name)
r = method(*args, **kwargs)
r.to_csv(temp_file_path)
os.rename(temp_file_path, file_path)
return r
return wrapped
return inner_decorator
def datetime_round(dt, freq):
"""
round down Timestamp series to a specified freq
"""
if not freq.is_absolute():
raise ValueError("Unit is relative")
# TODO: multitemporal units
all_units = list(freq.times.keys())
if len(all_units) == 1:
unit = all_units[0]
value = freq.times[unit]
if unit == "m":
unit = "t"
# No support for weeks in datetime.datetime
if unit == "w":
unit = "d"
value = value * 7
freq = str(value) + unit
return dt.dt.floor(freq)
else:
assert "Frequency cannot have multiple temporal parameters"
def gather_approximate_features(feature_set):
"""
Find features which can be approximated. Returned as a trie where the values
are sets of feature names.
Args:
feature_set (FeatureSet): Features to search the dependencies of for
features to approximate.
Returns:
Trie[RelationshipPath, set[str]]
"""
approximate_feature_trie = Trie(default=set, path_constructor=RelationshipPath)
for feature in feature_set.target_features:
if feature_set.uses_full_dataframe(feature, check_dependents=True):
continue
if isinstance(feature, DirectFeature):
path = feature.relationship_path
base_feature = feature.base_features[0]
while isinstance(base_feature, DirectFeature):
path = path + base_feature.relationship_path
base_feature = base_feature.base_features[0]
if isinstance(base_feature, AggregationFeature):
node_feature_set = approximate_feature_trie.get_node(path).value
node_feature_set.add(base_feature.unique_name())
return approximate_feature_trie
def gen_empty_approx_features_df(approx_features):
df = pd.DataFrame(columns=[f.get_name() for f in approx_features])
df.index.name = approx_features[0].dataframe.ww.index
return df
def n_jobs_to_workers(n_jobs):
try:
cpus = len(psutil.Process().cpu_affinity())
except AttributeError:
cpus = psutil.cpu_count()
# Taken from sklearn parallel_backends code
# https://github.com/scikit-learn/scikit-learn/blob/27bbdb570bac062c71b3bb21b0876fd78adc9f7e/sklearn/externals/joblib/_parallel_backends.py#L120
if n_jobs < 0:
workers = max(cpus + 1 + n_jobs, 1)
else:
workers = min(n_jobs, cpus)
assert workers > 0, "Need at least one worker"
return workers
def create_client_and_cluster(n_jobs, dask_kwargs, entityset_size):
Client, LocalCluster = get_client_cluster()
cluster = None
if "cluster" in dask_kwargs:
cluster = dask_kwargs["cluster"]
else:
# diagnostics_port sets the default port to launch bokeh web interface
# if it is set to None web interface will not be launched
diagnostics_port = None
if "diagnostics_port" in dask_kwargs:
diagnostics_port = dask_kwargs["diagnostics_port"]
del dask_kwargs["diagnostics_port"]
workers = n_jobs_to_workers(n_jobs)
if n_jobs != -1 and workers < n_jobs:
warning_string = "{} workers requested, but only {} workers created."
warning_string = warning_string.format(n_jobs, workers)
warnings.warn(warning_string)
# Distributed default memory_limit for worker is 'auto'. It calculates worker
# memory limit as total virtual memory divided by the number
# of cores available to the workers (alwasy 1 for featuretools setup).
# This means reducing the number of workers does not increase the memory
# limit for other workers. Featuretools default is to calculate memory limit
# as total virtual memory divided by number of workers. To use distributed
# default memory limit, set dask_kwargs['memory_limit']='auto'
if "memory_limit" in dask_kwargs:
memory_limit = dask_kwargs["memory_limit"]
del dask_kwargs["memory_limit"]
else:
total_memory = psutil.virtual_memory().total
memory_limit = int(total_memory / float(workers))
cluster = LocalCluster(
n_workers=workers,
threads_per_worker=1,
diagnostics_port=diagnostics_port,
memory_limit=memory_limit,
**dask_kwargs,
)
# if cluster has bokeh port, notify user if unexpected port number
if diagnostics_port is not None:
if hasattr(cluster, "scheduler") and cluster.scheduler:
info = cluster.scheduler.identity()
if "bokeh" in info["services"]:
msg = "Dashboard started on port {}"
print(msg.format(info["services"]["bokeh"]))
client = Client(cluster)
warned_of_memory = False
for worker in list(client.scheduler_info()["workers"].values()):
worker_limit = worker["memory_limit"]
if worker_limit < entityset_size:
raise ValueError("Insufficient memory to use this many workers")
elif worker_limit < 2 * entityset_size and not warned_of_memory:
logger.warning(
"Worker memory is between 1 to 2 times the memory"
" size of the EntitySet. If errors occur that do"
" not occur with n_jobs equals 1, this may be the "
"cause. See https://featuretools.alteryx.com/en/stable/guides/performance.html#parallel-feature-computation"
" for more information.",
)
warned_of_memory = True
return client, cluster
def get_client_cluster():
"""
Separated out the imports to make it easier to mock during testing
"""
from distributed import Client, LocalCluster
return Client, LocalCluster
def _validate_cutoff_time(
cutoff_time: typing.Union[dd.DataFrame, pd.DataFrame, str, datetime],
target_dataframe,
):
"""
Verify that the cutoff time is a single value or a pandas dataframe with the proper columns
containing no duplicate rows
"""
if isinstance(cutoff_time, dd.DataFrame):
msg = (
"cutoff_time should be a Pandas DataFrame: "
"computing cutoff_time, this may take a while"
)
warnings.warn(msg)
cutoff_time = cutoff_time.compute()
if isinstance(cutoff_time, pd.DataFrame):
cutoff_time = cutoff_time.reset_index(drop=True)
if "instance_id" not in cutoff_time.columns:
if target_dataframe.ww.index not in cutoff_time.columns:
raise AttributeError(
"Cutoff time DataFrame must contain a column with either the same name"
' as the target dataframe index or a column named "instance_id"',
)
# rename to instance_id
cutoff_time.rename(
columns={target_dataframe.ww.index: "instance_id"},
inplace=True,
)
if "time" not in cutoff_time.columns:
if (
target_dataframe.ww.time_index
and target_dataframe.ww.time_index not in cutoff_time.columns
):
raise AttributeError(
"Cutoff time DataFrame must contain a column with either the same name"
' as the target dataframe time_index or a column named "time"',
)
# rename to time
cutoff_time.rename(
columns={target_dataframe.ww.time_index: "time"},
inplace=True,
)
# Make sure user supplies only one valid name for instance id and time columns
if (
"instance_id" in cutoff_time.columns
and target_dataframe.ww.index in cutoff_time.columns
and "instance_id" != target_dataframe.ww.index
):
raise AttributeError(
'Cutoff time DataFrame cannot contain both a column named "instance_id" and a column'
" with the same name as the target dataframe index",
)
if (
"time" in cutoff_time.columns
and target_dataframe.ww.time_index in cutoff_time.columns
and "time" != target_dataframe.ww.time_index
):
raise AttributeError(
'Cutoff time DataFrame cannot contain both a column named "time" and a column'
" with the same name as the target dataframe time index",
)
assert (
cutoff_time[["instance_id", "time"]].duplicated().sum() == 0
), "Duplicated rows in cutoff time dataframe."
if isinstance(cutoff_time, str):
try:
cutoff_time = pd.to_datetime(cutoff_time)
except ValueError as e:
raise ValueError(f"While parsing cutoff_time: {str(e)}")
except OverflowError as e:
raise OverflowError(f"While parsing cutoff_time: {str(e)}")
else:
if isinstance(cutoff_time, list):
raise TypeError("cutoff_time must be a single value or DataFrame")
return cutoff_time
def _check_cutoff_time_type(cutoff_time, es_time_type):
"""
Check that the cutoff time values are of the proper type given the entityset time type
"""
# Check that cutoff_time time type matches entityset time type
if isinstance(cutoff_time, tuple):
cutoff_time_value = cutoff_time[0]
time_type = _check_time_type(cutoff_time_value)
is_numeric = time_type == "numeric"
is_datetime = time_type == Datetime
else:
cutoff_time_col = cutoff_time.ww["time"]
is_numeric = cutoff_time_col.ww.schema.is_numeric
is_datetime = cutoff_time_col.ww.schema.is_datetime
if es_time_type == "numeric" and not is_numeric:
raise TypeError(
"cutoff_time times must be numeric: try casting " "via pd.to_numeric()",
)
if es_time_type == Datetime and not is_datetime:
raise TypeError(
"cutoff_time times must be datetime type: try casting "
"via pd.to_datetime()",
)
[docs]def replace_inf_values(feature_matrix, replacement_value=np.nan, columns=None):
"""Replace all ``np.inf`` values in a feature matrix with the specified replacement value.
Args:
feature_matrix (DataFrame): DataFrame whose columns are feature names and rows are instances
replacement_value (int, float, str, optional): Value with which ``np.inf`` values will be replaced
columns (list[str], optional): A list specifying which columns should have values replaced. If None,
values will be replaced for all columns.
Returns:
feature_matrix
"""
if columns is None:
feature_matrix = feature_matrix.replace([np.inf, -np.inf], replacement_value)
else:
feature_matrix[columns] = feature_matrix[columns].replace(
[np.inf, -np.inf],
replacement_value,
)
return feature_matrix
def get_ww_types_from_features(
features,
entityset,
pass_columns=None,
cutoff_time=None,
):
"""Given a list of features and entityset (and optionally a list of pass
through columns and the cutoff time dataframe), returns the logical types,
semantic tags,and origin of each column in the feature matrix. Both
pass_columns and cutoff_time will need to be supplied in order to get the
type information for the pass through columns
"""
if pass_columns is None:
pass_columns = []
logical_types = {}
semantic_tags = {}
origins = {}
for feature in features:
names = feature.get_feature_names()
for name in names:
logical_types[name] = feature.column_schema.logical_type
semantic_tags[name] = feature.column_schema.semantic_tags.copy()
semantic_tags[name] -= {"index", "time_index"}
if logical_types[name] is None and "numeric" in semantic_tags[name]:
logical_types[name] = Double
if all([f.primitive is None for f in feature.get_dependencies(deep=True)]):
origins[name] = "base"
else:
origins[name] = "engineered"
if pass_columns:
cutoff_schema = cutoff_time.ww.schema
for column in pass_columns:
logical_types[column] = cutoff_schema.logical_types[column]
semantic_tags[column] = cutoff_schema.semantic_tags[column]
origins[column] = "base"
if entityset.dataframe_type in (Library.DASK, Library.SPARK):
target_dataframe_name = features[0].dataframe_name
table_schema = entityset[target_dataframe_name].ww.schema
index_col = table_schema.index
logical_types[index_col] = table_schema.logical_types[index_col]
semantic_tags[index_col] = table_schema.semantic_tags[index_col]
semantic_tags[index_col] -= {"index"}
origins[index_col] = "base"
ww_init = {
"logical_types": logical_types,
"semantic_tags": semantic_tags,
"column_origins": origins,
}
return ww_init