Source code for featuretools.feature_base.feature_base

from woodwork.column_schema import ColumnSchema
from woodwork.logical_types import Boolean

from featuretools import primitives
from featuretools.entityset.relationship import Relationship, RelationshipPath
from featuretools.entityset.timedelta import Timedelta
from featuretools.feature_base.utils import is_valid_input
from featuretools.primitives.base import (
    AggregationPrimitive,
    PrimitiveBase,
    TransformPrimitive
)
from featuretools.primitives.utils import serialize_primitive
from featuretools.utils.wrangle import (
    _check_time_against_column,
    _check_timedelta
)

_ES_REF = {}


class FeatureBase(object):

    def __init__(self, dataframe, base_features, relationship_path, primitive, name=None, names=None):
        """Base class for all features

        Args:
            entityset (EntitySet): entityset this feature is being calculated for
            dataframe (DataFrame): dataframe for calculating this feature
            base_features (list[FeatureBase]): list of base features for primitive
            relationship_path (RelationshipPath): path from this dataframe to the
                dataframe of the base features.
            primitive (:class:`.PrimitiveBase`): primitive to calculate. if not initialized when passed, gets initialized with no arguments
        """
        assert all(isinstance(f, FeatureBase) for f in base_features), \
            "All base features must be features"

        self.dataframe_name = dataframe.ww.name
        self.entityset = _ES_REF[dataframe.ww.metadata['entityset_id']]

        self.base_features = base_features

        # initialize if not already initialized
        if not isinstance(primitive, PrimitiveBase):
            primitive = primitive()
        self.primitive = primitive

        self.relationship_path = relationship_path

        self._name = name

        self._names = names

        assert self._check_input_types(), ("Provided inputs don't match input "
                                           "type requirements")

    def __getitem__(self, key):
        assert self.number_output_features > 1, \
            'can only access slice of multi-output feature'
        assert self.number_output_features > key, \
            'index is higher than the number of outputs'
        return FeatureOutputSlice(self, key)

    @classmethod
    def from_dictionary(cls, arguments, entityset, dependencies, primitives_deserializer):
        raise NotImplementedError("Must define from_dictionary on FeatureBase subclass")

[docs] def rename(self, name): """Rename Feature, returns copy""" feature_copy = self.copy() feature_copy._name = name feature_copy._names = None return feature_copy
def copy(self): raise NotImplementedError("Must define copy on FeatureBase subclass") def get_name(self): if not self._name: self._name = self.generate_name() return self._name def get_feature_names(self): if not self._names: if self.number_output_features == 1: self._names = [self.get_name()] else: self._names = self.generate_names() if self.get_name() != self.generate_name(): self._names = [self.get_name() + '[{}]'.format(i) for i in range(len(self._names))] return self._names def get_function(self, **kwargs): return self.primitive.get_function(**kwargs) def get_dependencies(self, deep=False, ignored=None, copy=True): """Returns features that are used to calculate this feature ..note:: If you only want the features that make up the input to the feature function use the base_features attribute instead. """ deps = [] for d in self.base_features[:]: deps += [d] if hasattr(self, "where") and self.where: deps += [self.where] if ignored is None: ignored = set([]) deps = [d for d in deps if d.unique_name() not in ignored] if deep: for dep in deps[:]: # copy so we don't modify list we iterate over deep_deps = dep.get_dependencies(deep, ignored) deps += deep_deps return deps
[docs] def get_depth(self, stop_at=None): """Returns depth of feature""" max_depth = 0 stop_at_set = set() if stop_at is not None: stop_at_set = set([i.unique_name() for i in stop_at]) if self.unique_name() in stop_at_set: return 0 for dep in self.get_dependencies(deep=True, ignored=stop_at_set): max_depth = max(dep.get_depth(stop_at=stop_at), max_depth) return max_depth + 1
def _check_input_types(self): if len(self.base_features) == 0: return True input_types = self.primitive.input_types if input_types is not None: if type(input_types[0]) != list: input_types = [input_types] for t in input_types: zipped = list(zip(t, self.base_features)) if all([is_valid_input(f.column_schema, t) for t, f in zipped]): return True else: return True return False @property def dataframe(self): """Dataframe this feature belongs too""" return self.entityset[self.dataframe_name] @property def number_output_features(self): return self.primitive.number_output_features def __repr__(self): return "<Feature: %s>" % (self.get_name()) def hash(self): return hash(self.get_name() + self.dataframe_name) def __hash__(self): return self.hash() @property def column_schema(self): feature = self column_schema = self.primitive.return_type while column_schema is None: # get column_schema of first base feature base_feature = feature.base_features[0] column_schema = base_feature.column_schema # only the original time index should exist # so make this feature's return type just a Datetime if 'time_index' in column_schema.semantic_tags: column_schema = ColumnSchema(logical_type=column_schema.logical_type, semantic_tags=column_schema.semantic_tags - {"time_index"}) elif 'index' in column_schema.semantic_tags: column_schema = ColumnSchema(logical_type=column_schema.logical_type, semantic_tags=column_schema.semantic_tags - {"index"}) # Need to add back in the numeric standard tag so the schema can get recognized # as a valid return type if column_schema.is_numeric: column_schema.semantic_tags.add('numeric') if column_schema.is_categorical: column_schema.semantic_tags.add('category') # direct features should keep the foreign key tag, but all other features should get converted if not isinstance(feature, DirectFeature) and 'foreign_key' in column_schema.semantic_tags: column_schema = ColumnSchema(logical_type=column_schema.logical_type, semantic_tags=column_schema.semantic_tags - {"foreign_key"}) feature = base_feature return column_schema @property def default_value(self): return self.primitive.default_value def get_arguments(self): raise NotImplementedError("Must define get_arguments on FeatureBase subclass") def to_dictionary(self): return { 'type': type(self).__name__, 'dependencies': [dep.unique_name() for dep in self.get_dependencies()], 'arguments': self.get_arguments(), } def _handle_binary_comparision(self, other, Primitive, PrimitiveScalar): if isinstance(other, FeatureBase): return Feature([self, other], primitive=Primitive) return Feature([self], primitive=PrimitiveScalar(other)) def __eq__(self, other): """Compares to other by equality""" return self._handle_binary_comparision(other, primitives.Equal, primitives.EqualScalar) def __ne__(self, other): """Compares to other by non-equality""" return self._handle_binary_comparision(other, primitives.NotEqual, primitives.NotEqualScalar) def __gt__(self, other): """Compares if greater than other""" return self._handle_binary_comparision(other, primitives.GreaterThan, primitives.GreaterThanScalar) def __ge__(self, other): """Compares if greater than or equal to other""" return self._handle_binary_comparision(other, primitives.GreaterThanEqualTo, primitives.GreaterThanEqualToScalar) def __lt__(self, other): """Compares if less than other""" return self._handle_binary_comparision(other, primitives.LessThan, primitives.LessThanScalar) def __le__(self, other): """Compares if less than or equal to other""" return self._handle_binary_comparision(other, primitives.LessThanEqualTo, primitives.LessThanEqualToScalar) def __add__(self, other): """Add other""" return self._handle_binary_comparision(other, primitives.AddNumeric, primitives.AddNumericScalar) def __radd__(self, other): return self.__add__(other) def __sub__(self, other): """Subtract other""" return self._handle_binary_comparision(other, primitives.SubtractNumeric, primitives.SubtractNumericScalar) def __rsub__(self, other): return Feature([self], primitive=primitives.ScalarSubtractNumericFeature(other)) def __div__(self, other): """Divide by other""" return self._handle_binary_comparision(other, primitives.DivideNumeric, primitives.DivideNumericScalar) def __truediv__(self, other): return self.__div__(other) def __rtruediv__(self, other): return self.__rdiv__(other) def __rdiv__(self, other): return Feature([self], primitive=primitives.DivideByFeature(other)) def __mul__(self, other): """Multiply by other""" if isinstance(other, FeatureBase): if all([isinstance(f.column_schema.logical_type, Boolean) for f in (self, other)]): return Feature([self, other], primitive=primitives.MultiplyBoolean) return self._handle_binary_comparision(other, primitives.MultiplyNumeric, primitives.MultiplyNumericScalar) def __rmul__(self, other): return self.__mul__(other) def __mod__(self, other): """Take modulus of other""" return self._handle_binary_comparision(other, primitives.ModuloNumeric, primitives.ModuloNumericScalar) def __rmod__(self, other): return Feature([self], primitive=primitives.ModuloByFeature(other)) def __and__(self, other): return self.AND(other) def __rand__(self, other): return Feature([other, self], primitive=primitives.And) def __or__(self, other): return self.OR(other) def __ror__(self, other): return Feature([other, self], primitive=primitives.Or) def __not__(self, other): return self.NOT(other) def __abs__(self): return Feature([self], primitive=primitives.Absolute) def __neg__(self): return Feature([self], primitive=primitives.Negate) def AND(self, other_feature): """Logical AND with other_feature""" return Feature([self, other_feature], primitive=primitives.And) def OR(self, other_feature): """Logical OR with other_feature""" return Feature([self, other_feature], primitive=primitives.Or) def NOT(self): """Creates inverse of feature""" return Feature([self], primitive=primitives.Not) def isin(self, list_of_output): return Feature([self], primitive=primitives.IsIn(list_of_outputs=list_of_output)) def is_null(self): """Compares feature to null by equality""" return Feature([self], primitive=primitives.IsNull) def __invert__(self): return self.NOT() def unique_name(self): return u"%s: %s" % (self.dataframe_name, self.get_name()) def relationship_path_name(self): return self.relationship_path.name class IdentityFeature(FeatureBase): """Feature for dataframe that is equivalent to underlying column""" def __init__(self, column, name=None): self.column_name = column.ww.name self.return_type = column.ww.schema metadata = column.ww.schema._metadata es = _ES_REF[metadata['entityset_id']] super(IdentityFeature, self).__init__(dataframe=es[metadata['dataframe_name']], base_features=[], relationship_path=RelationshipPath([]), primitive=PrimitiveBase, name=name) @classmethod def from_dictionary(cls, arguments, entityset, dependencies, primitives_deserializer): dataframe_name = arguments['dataframe_name'] column_name = arguments['column_name'] column = entityset[dataframe_name].ww[column_name] return cls(column=column, name=arguments['name']) def copy(self): """Return copy of feature""" return IdentityFeature(self.entityset[self.dataframe_name].ww[self.column_name]) def generate_name(self): return self.column_name def get_depth(self, stop_at=None): return 0 def get_arguments(self): return { 'name': self._name, 'column_name': self.column_name, 'dataframe_name': self.dataframe_name, } @property def column_schema(self): return self.return_type class DirectFeature(FeatureBase): """Feature for child dataframe that inherits a feature value from a parent dataframe""" input_types = [ColumnSchema()] return_type = None def __init__(self, base_feature, child_dataframe_name, relationship=None, name=None): base_feature = _validate_base_features(base_feature)[0] self.parent_dataframe_name = base_feature.dataframe_name relationship = self._handle_relationship(base_feature.entityset, child_dataframe_name, relationship) child_dataframe = base_feature.entityset[child_dataframe_name] super(DirectFeature, self).__init__(dataframe=child_dataframe, base_features=[base_feature], relationship_path=RelationshipPath([(True, relationship)]), primitive=PrimitiveBase, name=name) def _handle_relationship(self, entityset, child_dataframe_name, relationship): child_dataframe = entityset[child_dataframe_name] if relationship: relationship_child = relationship.child_dataframe assert child_dataframe.ww.name == relationship_child.ww.name, \ 'child_dataframe must be the relationship child dataframe' assert self.parent_dataframe_name == relationship.parent_dataframe.ww.name, \ 'Base feature must be defined on the relationship parent dataframe' else: child_relationships = entityset.get_forward_relationships(child_dataframe.ww.name) possible_relationships = (r for r in child_relationships if r.parent_dataframe.ww.name == self.parent_dataframe_name) relationship = next(possible_relationships, None) if not relationship: raise RuntimeError('No relationship from "%s" to "%s" found.' % (child_dataframe.ww.name, self.parent_dataframe_name)) # Check for another path. elif next(possible_relationships, None): message = "There are multiple relationships to the base dataframe. " \ "You must specify a relationship." raise RuntimeError(message) return relationship @classmethod def from_dictionary(cls, arguments, entityset, dependencies, primitives_deserializer): base_feature = dependencies[arguments['base_feature']] relationship = Relationship.from_dictionary(arguments['relationship'], entityset) child_dataframe_name = relationship.child_dataframe.ww.name return cls(base_feature=base_feature, child_dataframe_name=child_dataframe_name, relationship=relationship, name=arguments['name']) @property def number_output_features(self): return self.base_features[0].number_output_features @property def default_value(self): return self.base_features[0].default_value def copy(self): """Return copy of feature""" _is_forward, relationship = self.relationship_path[0] return DirectFeature(self.base_features[0], self.dataframe_name, relationship=relationship) @property def column_schema(self): return self.base_features[0].column_schema def generate_name(self): return self._name_from_base(self.base_features[0].get_name()) def generate_names(self): return [self._name_from_base(base_name) for base_name in self.base_features[0].get_feature_names()] def get_arguments(self): _is_forward, relationship = self.relationship_path[0] return { 'name': self._name, 'base_feature': self.base_features[0].unique_name(), 'relationship': relationship.to_dictionary(), } def _name_from_base(self, base_name): return u"%s.%s" % (self.relationship_path_name(), base_name) class AggregationFeature(FeatureBase): # Feature to condition this feature by in # computation (e.g. take the Count of products where the product_id is # "basketball".) where = None #: (str or :class:`.Timedelta`): Use only some amount of previous data from # each time point during calculation use_previous = None def __init__(self, base_features, parent_dataframe_name, primitive, relationship_path=None, use_previous=None, where=None, name=None): base_features = _validate_base_features(base_features) for bf in base_features: if bf.number_output_features > 1: raise ValueError("Cannot stack on whole multi-output feature.") self.child_dataframe_name = base_features[0].dataframe_name entityset = base_features[0].entityset relationship_path, self._path_is_unique = \ self._handle_relationship_path(entityset, parent_dataframe_name, relationship_path) self.parent_dataframe_name = parent_dataframe_name if where is not None: self.where = _validate_base_features(where)[0] msg = "Where feature must be defined on child dataframe {}".format( self.child_dataframe_name) assert self.where.dataframe_name == self.child_dataframe_name, msg if use_previous: assert entityset[self.child_dataframe_name].ww.time_index is not None, ( "Applying function that requires time index to dataframe that " "doesn't have one") self.use_previous = _check_timedelta(use_previous) assert len(base_features) > 0 time_index = base_features[0].dataframe.ww.time_index time_col = base_features[0].dataframe.ww[time_index] assert time_index is not None, ("Use previous can only be defined " "on dataframes with a time index") assert _check_time_against_column(self.use_previous, time_col) super(AggregationFeature, self).__init__(dataframe=entityset[parent_dataframe_name], base_features=base_features, relationship_path=relationship_path, primitive=primitive, name=name) def _handle_relationship_path(self, entityset, parent_dataframe_name, relationship_path): parent_dataframe = entityset[parent_dataframe_name] child_dataframe = entityset[self.child_dataframe_name] if relationship_path: assert all(not is_forward for is_forward, _r in relationship_path), \ 'All relationships in path must be backward' _is_forward, first_relationship = relationship_path[0] first_parent = first_relationship.parent_dataframe assert parent_dataframe.ww.name == first_parent.ww.name, \ 'parent_dataframe must match first relationship in path.' _is_forward, last_relationship = relationship_path[-1] assert child_dataframe.ww.name == last_relationship.child_dataframe.ww.name, \ 'Base feature must be defined on the dataframe at the end of relationship_path' path_is_unique = entityset \ .has_unique_forward_path(child_dataframe.ww.name, parent_dataframe.ww.name) else: paths = entityset \ .find_backward_paths(parent_dataframe.ww.name, child_dataframe.ww.name) first_path = next(paths, None) if not first_path: raise RuntimeError('No backward path from "%s" to "%s" found.' % (parent_dataframe.ww.name, child_dataframe.ww.name)) # Check for another path. elif next(paths, None): message = "There are multiple possible paths to the base dataframe. " \ "You must specify a relationship path." raise RuntimeError(message) relationship_path = RelationshipPath([(False, r) for r in first_path]) path_is_unique = True return relationship_path, path_is_unique @classmethod def from_dictionary(cls, arguments, entityset, dependencies, primitives_deserializer): base_features = [dependencies[name] for name in arguments['base_features']] relationship_path = [Relationship.from_dictionary(r, entityset) for r in arguments['relationship_path']] parent_dataframe_name = relationship_path[0].parent_dataframe.ww.name relationship_path = RelationshipPath([(False, r) for r in relationship_path]) primitive = primitives_deserializer.deserialize_primitive(arguments['primitive']) use_previous_data = arguments['use_previous'] use_previous = use_previous_data and Timedelta.from_dictionary(use_previous_data) where_name = arguments['where'] where = where_name and dependencies[where_name] return cls(base_features=base_features, parent_dataframe_name=parent_dataframe_name, primitive=primitive, relationship_path=relationship_path, use_previous=use_previous, where=where, name=arguments['name']) def copy(self): return AggregationFeature(self.base_features, parent_dataframe_name=self.parent_dataframe_name, relationship_path=self.relationship_path, primitive=self.primitive, use_previous=self.use_previous, where=self.where) def _where_str(self): if self.where is not None: where_str = u" WHERE " + self.where.get_name() else: where_str = '' return where_str def _use_prev_str(self): if self.use_previous is not None and hasattr(self.use_previous, 'get_name'): use_prev_str = u", Last {}".format(self.use_previous.get_name()) else: use_prev_str = u'' return use_prev_str def generate_name(self): return self.primitive.generate_name(base_feature_names=[bf.get_name() for bf in self.base_features], relationship_path_name=self.relationship_path_name(), parent_dataframe_name=self.parent_dataframe_name, where_str=self._where_str(), use_prev_str=self._use_prev_str()) def generate_names(self): return self.primitive.generate_names(base_feature_names=[bf.get_name() for bf in self.base_features], relationship_path_name=self.relationship_path_name(), parent_dataframe_name=self.parent_dataframe_name, where_str=self._where_str(), use_prev_str=self._use_prev_str()) def get_arguments(self): return { 'name': self._name, 'base_features': [feat.unique_name() for feat in self.base_features], 'relationship_path': [r.to_dictionary() for _, r in self.relationship_path], 'primitive': serialize_primitive(self.primitive), 'where': self.where and self.where.unique_name(), 'use_previous': self.use_previous and self.use_previous.get_arguments(), } def relationship_path_name(self): if self._path_is_unique: return self.child_dataframe_name else: return self.relationship_path.name class TransformFeature(FeatureBase): def __init__(self, base_features, primitive, name=None): # Any edits made to this method should also be made to the # new_class_init method in make_trans_primitive base_features = _validate_base_features(base_features) for bf in base_features: if bf.number_output_features > 1: raise ValueError("Cannot stack on whole multi-output feature.") dataframe = base_features[0].entityset[base_features[0].dataframe_name] super(TransformFeature, self).__init__(dataframe=dataframe, base_features=base_features, relationship_path=RelationshipPath([]), primitive=primitive, name=name) @classmethod def from_dictionary(cls, arguments, entityset, dependencies, primitives_deserializer): base_features = [dependencies[name] for name in arguments['base_features']] primitive = primitives_deserializer.deserialize_primitive(arguments['primitive']) return cls(base_features=base_features, primitive=primitive, name=arguments['name']) def copy(self): return TransformFeature(self.base_features, self.primitive) def generate_name(self): return self.primitive.generate_name(base_feature_names=[bf.get_name() for bf in self.base_features]) def generate_names(self): return self.primitive.generate_names(base_feature_names=[bf.get_name() for bf in self.base_features]) def get_arguments(self): return { 'name': self._name, 'base_features': [feat.unique_name() for feat in self.base_features], 'primitive': serialize_primitive(self.primitive) } class GroupByTransformFeature(TransformFeature): def __init__(self, base_features, primitive, groupby, name=None): if not isinstance(groupby, FeatureBase): groupby = IdentityFeature(groupby) assert len({"category", "foreign_key"} - groupby.column_schema.semantic_tags) < 2 self.groupby = groupby base_features = _validate_base_features(base_features) base_features.append(groupby) super(GroupByTransformFeature, self).__init__(base_features=base_features, primitive=primitive, name=name) @classmethod def from_dictionary(cls, arguments, entityset, dependencies, primitives_deserializer): base_features = [dependencies[name] for name in arguments['base_features']] primitive = primitives_deserializer.deserialize_primitive(arguments['primitive']) groupby = dependencies[arguments['groupby']] return cls(base_features=base_features, primitive=primitive, groupby=groupby, name=arguments['name']) def copy(self): # the groupby feature is appended to base_features in the __init__ # so here we separate them again return GroupByTransformFeature(self.base_features[:-1], self.primitive, self.groupby) def generate_name(self): # exclude the groupby feature from base_names since it has a special # place in the feature name base_names = [bf.get_name() for bf in self.base_features[:-1]] _name = self.primitive.generate_name(base_names) return u"{} by {}".format(_name, self.groupby.get_name()) def generate_names(self): base_names = [bf.get_name() for bf in self.base_features[:-1]] _names = self.primitive.generate_names(base_names) names = [name + " by {}".format(self.groupby.get_name()) for name in _names] return names def get_arguments(self): # Do not include groupby in base_features. feature_names = [feat.unique_name() for feat in self.base_features if feat.unique_name() != self.groupby.unique_name()] return { 'name': self._name, 'base_features': feature_names, 'primitive': serialize_primitive(self.primitive), 'groupby': self.groupby.unique_name(), } class Feature(object): """ Alias to create feature. Infers the feature type based on init parameters. """ def __new__(self, base, dataframe_name=None, groupby=None, parent_dataframe_name=None, primitive=None, use_previous=None, where=None): # either direct or identity if primitive is None and dataframe_name is None: return IdentityFeature(base) elif primitive is None and dataframe_name is not None: return DirectFeature(base, dataframe_name) elif primitive is not None and parent_dataframe_name is not None: assert isinstance(primitive, AggregationPrimitive) or issubclass(primitive, AggregationPrimitive) return AggregationFeature(base, parent_dataframe_name=parent_dataframe_name, use_previous=use_previous, where=where, primitive=primitive) elif primitive is not None: assert (isinstance(primitive, TransformPrimitive) or issubclass(primitive, TransformPrimitive)) if groupby is not None: return GroupByTransformFeature(base, primitive=primitive, groupby=groupby) return TransformFeature(base, primitive=primitive) raise Exception("Unrecognized feature initialization") class FeatureOutputSlice(FeatureBase): """ Class to access specific multi output feature column """ def __init__(self, base_feature, n, name=None): base_features = [base_feature] self.num_output_parent = base_feature.number_output_features msg = "cannot access slice from single output feature" assert(self.num_output_parent > 1), msg msg = "cannot access column that is not between 0 and " + str(self.num_output_parent - 1) assert(n < self.num_output_parent), msg self.n = n self._name = name self._names = [name] if name else None self.base_features = base_features self.base_feature = base_features[0] self.dataframe_name = base_feature.dataframe_name self.entityset = base_feature.entityset self.primitive = base_feature.primitive self.relationship_path = base_feature.relationship_path def __getitem__(self, key): raise ValueError("Cannot get item from slice of multi output feature") def generate_name(self): return self.base_feature.get_feature_names()[self.n] @property def number_output_features(self): return 1 def get_arguments(self): return { 'name': self._name, 'base_feature': self.base_feature.unique_name(), 'n': self.n } @classmethod def from_dictionary(cls, arguments, entityset, dependencies, primitives_deserializer): base_feature_name = arguments['base_feature'] base_feature = dependencies[base_feature_name] n = arguments['n'] name = arguments['name'] return cls(base_feature=base_feature, n=n, name=name) def copy(self): return FeatureOutputSlice(self.base_feature, self.n) def _validate_base_features(feature): if 'Series' == type(feature).__name__: return [IdentityFeature(feature)] elif hasattr(feature, '__iter__'): features = [_validate_base_features(f)[0] for f in feature] msg = "all base features must share the same dataframe" assert len(set([bf.dataframe_name for bf in features])) == 1, msg return features elif isinstance(feature, FeatureBase): return [feature] else: raise Exception("Not a feature")