Module sentspace.embedding.utils

Expand source code
from collections import defaultdict
from contextlib import contextmanager
import os
import pickle
import typing
import warnings
from pathlib import Path
import random

import numpy as np
import pandas as pd
import sentspace.utils
from sentspace.utils import io, text
from sentspace.utils.caching import cache_to_mem #, cache_to_disk
from tqdm import tqdm

import torch
from transformers import AutoModel, AutoConfig, AutoTokenizer


def download_embeddings(model_name='glove.840B.300d.txt'):
    raise NotImplementedError
    if 'glove' in model_name:
        url = 'https://huggingface.co/stanfordnlp/glove/resolve/main/glove.840B.300d.zip'
        # download(url)




def flatten_activations(activations: typing.Dict[int, np.array]) -> pd.DataFrame:
    """
    Convert layer-wise activations into flattened dataframe format.
    Input: dict, key = layer, item = nd array of representations of that layer (n_tokens, )
    Output: pd dataframe, MultiIndex (layer, unit)
    """
    labels = []
    arr_flat = []
    for layer, act_arr in activations.items():
        arr_flat.append(act_arr.reshape(1,-1))
        for i in range(act_arr.shape[0]): # across units
            labels.append(('representation', layer, i,))
    arr_flat = np.concatenate(arr_flat, axis=1) # concatenated activations across layers
    df = pd.DataFrame(arr_flat)
    df.columns = pd.MultiIndex.from_tuples(labels) # rows: stimuli, columns: units
    return df


@cache_to_mem
def load_embeddings(emb_file: str = 'glove.840B.300d.txt',
                    data_dir: Path = None,
                    vocab: tuple = ()):
    """
    Read through the embedding file to find embeddings for target words in vocab
    Return dict mapping word to embedding (numpy array)
    """
    try:
        data_dir = Path(data_dir)
    except TypeError:
        data_dir = Path(__file__).parent / '..' / '..' / '.feature_database/'
    
    vocab = set(vocab)
    OOV = set(vocab)

    io.log(f"loading embeddings from {emb_file} for vocab of size {len(vocab)}")
    w2v = {}
    with (data_dir / emb_file).open('r') as f:
        total_lines = sum(1 for _ in tqdm(f, desc=f'counting # of lines in {data_dir/emb_file}'))
    with (data_dir / emb_file).open('r') as f:
        for line in tqdm(f, total=total_lines, desc=f'searching for embeddings in {emb_file}'):
            token, *emb = line.split(' ')
            if token in vocab or len(vocab) == 0:
                # print(f'found {token}!')
                w2v[token] = np.asarray(emb, dtype=float)
                OOV.difference_update({token}) # calling .remove() on an empty set would give an error
    
    io.log(f"---done--- loading embeddings from {emb_file}. OOV count: {len(OOV)}/{len(vocab)}")
    OOVlist = [*OOV]
    random.shuffle(OOVlist)
    io.log(f"           a selection of up to 32 random OOV tokens: {OOVlist[:32]}")

    return w2v


def get_word_embeds(sentence: sentspace.Sentence.Sentence, w2v: typing.Dict[str, np.array], 
                    model_name: str = 'glove', dims: int = None) -> typing.Dict[str, typing.DefaultDict[None, list]]:
    """Extracts [static] word embeddings for tokens in the given sentence 

    Args:
        sentence ([sentspace.Sentence.Sentence]): a Sentence object
        w2v ([dict]): word embeddings dictionary as a mapping from token -> vector
        model_name (str, optional): [description]. Defaults to 'glove'.
        dims (int, optional): [description]. Defaults to 300.

    Raises:
        ValueError: [description]

    Returns:
        [type]: [description]
    """
    # layer -> [emb_t1 emb_t2 emb_t3 ...]
    embeddings = defaultdict(list)
    dims = dims or next(iter(w2v[model_name].values())).shape[-1]

    # OOV_words = set()
    for token in sentence:
        if token in w2v:
            embeddings[0].append(w2v[token])
        else:
            embeddings[0].append(np.repeat(np.nan, dims))
            # sentence.OOV[model_name].add(token)
            # OOV_words.add(token)
    
    return {model_name: embeddings}


@cache_to_mem
def load_huggingface(model_name_or_path: str = 'distilgpt2', device='cpu'):
    """loads and caches a huggingface model and tokenizer

    Args:
        model_name_or_path (str): Huggingface model hub identifier or path to directory 
                                  containing config and model weights. Defaults to 'gpt2'.

    Returns:
        Tuple[AutoModel, AutoTokenizer]: returns a model and a tokenizer
    """
    if 'TRANSFORMERS_CACHE' not in os.environ:
        os.environ['TRANSFORMERS_CACHE'] = str(Path(__file__).parent.parent.parent / 'TRANSFORMERS_CACHE/')
    io.log(f"loading HuggingFace model [{model_name_or_path}] using TRANSFORMERS_CACHE={os.environ['TRANSFORMERS_CACHE']}")
    t = AutoTokenizer.from_pretrained(model_name_or_path, cache_dir=os.environ['TRANSFORMERS_CACHE'])
    m = AutoModel.from_pretrained(model_name_or_path, cache_dir=os.environ['TRANSFORMERS_CACHE'])
    m.to(device)
    m.eval()

    return m, t


def get_huggingface_embeds(sentence: sentspace.Sentence.Sentence, 
                           model_name: str = 'distilgpt2',
                           layers: typing.Collection[int] = None,
                           dims: int = None) -> typing.Dict[str, typing.DefaultDict[int, list]]:
    """Extracts [static] word embeddings for tokens in the given sentence 

    Args:
        sentence (sentspace.Sentence.Sentence): [description]
        model_name (str, optional): [description]. Defaults to 'distilgpt2'.
        layers (list[int], optional): a collection of layers to extract from the model. if None, all layers are extracted.
        dims (int, optional): [description]. Defaults to None.

    Raises:
        ValueError: [description]

    Returns:
        typing.Dict[str, typing.DefaultDict[int, list]]: [description]
    """
    # layer -> [emb_t1 emb_t2 emb_t3 ...]
    representations = dict()

    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    model, tokenizer = load_huggingface(model_name, device=device)

    # we don't want to track gradients; only interested in the encoding
    model.eval()
    with torch.no_grad():

        # current procedure processes sentences individually. consider minibatching.
        batch_encoding = tokenizer(str(sentence), return_tensors="pt", truncation='longest_first').to(device)
        input_ids = batch_encoding['input_ids']
        
        # overflow_tokens = max(0, len(input_ids) - model.config.n_positions)
        # if overflow_tokens > 0: io.log(f"Stimulus too long! Truncated the first {overflow_tokens} tokens", type='WARN')
        
        # input_ids = input_ids[overflow_tokens:]
        # print(tokenizer.convert_ids_to_tokens(input_ids))

        output = model(input_ids, output_hidden_states=True, return_dict=True)
        hidden_states = output['hidden_states']

        #  for i in range(n_layer+1):
        for layer in range(len(hidden_states)):
            if layers is None or layer in layers:
                token = slice(None, None) # placeholder to allow a possibility of picking a particular token rather than the full sequence
                representations[layer] = hidden_states[layer].detach().cpu().squeeze().numpy()[token, :]

        # print(input_ids.shape, representations[0].shape)

    return {model_name: representations}

    

def pool_sentence_embeds(sentence, token_embeddings, filters={'nofilter': lambda i, x: True},
                         keys=None, methods={'mean', 'median'}):
    """pools embeddings of an entire sentence (given as a list of embeddings)
       using averaging, maxpooling, minpooling, etc., after applying all the
       provided filters as functions (such as content words only).

    Args:
        token_embeddings (list[np.array]): [description]
        filters (list[function[(idx, token) -> bool]], optional): [description]. Defaults to [lambda x: True].
            filters should be functions that map token to bool (e.g. is_content_word(...))
            only tokens that satisfy all filters are retained.
        keys (`typing.Union[typing.Collection[str], None]`): which models we want to pool using the methods supplied. 
            if None, all available models are pooled (separately) using the supplied methods
        methods (`typing.Collection[typing.Union[str, typing.Tuple[str, typing.Callable]]]`):

    Returns:
        dict: averaging method -> averaged embedding
    """                         

    """
    Return dataframe of each sentence no. and its sentence embedding
    from averaging embeddings of words in a sentence (ignore NAs)
    Parameters:
        df: dataframe, output of get_glove_word()
        content_only: if True, use content words only
        is_content_lst: list, values 1 if token is content word, 0 otherwise
        save: whether to save results
        save_path: path to save, support .csv & .mat files
    """
    
    # model -> method -> repr
    all_pooled = defaultdict(dict)

    for model_name in token_embeddings:

        # if the current model_name is not meant to be aggregated in this manner, skip
        # (e.g. BERT and last token "aggregation")
        # if keys is None, any aggregation step specified will be applied regardless of model_name
        # (e.g., mean)
        if keys and model_name not in keys: 
            continue

        # map from method --> layer (int) --> pooled representation per model
        # this entity still needs to be flattened
        model_pooled = defaultdict(dict)

        for layer in token_embeddings[model_name]:

            # all the embeddings corresponding to the tokens
            all_embeds = [e for i, (t, e) in enumerate(zip(sentence.tokens, token_embeddings[model_name][layer]))]
            all_tokens = [t for i, (t, e) in enumerate(zip(sentence.tokens, token_embeddings[model_name][layer]))]
            all_embeds = np.array(all_embeds, dtype=np.float32)
            all_tokens = np.array(all_tokens, dtype=str)
            # print(all_tokens, all_embeds.shape)

            # exclude OOV words' embeddings (they are all NaNs)
            not_nan_tokens = all_tokens[~np.isnan(all_embeds[:, 0])   ]
            not_nan_embeds = all_embeds[~np.isnan(all_embeds[:, 0]), :]

            # make a note of the shape of the vector (n x embed_dim)
            shape = not_nan_embeds.shape
            # TODO: vectorize operation on all tokensnow that it is an numpy array using apply()?
            if filters:
                mask = [all(fn(i, t) for fn_name, fn in filters.items()) for i, t in enumerate(not_nan_tokens)]
            else:
                mask = slice(None, None, None)

            filtered_embeds = not_nan_embeds[mask]
            filtered_shape = filtered_embeds.shape

            # if filtering left no tokens, we will use all_embeds instead
            if filtered_shape[0] == 0:
                io.log(f'filtered embeddings for current sentence are empty. retrying without filters: {sentence.tokens}', type='WARN')

                # now what? use unfiltered (as a fallback)
                filtered_embeds = not_nan_embeds
                
            # [very rarely] if no word has a corresponding embedding, then we have no choice
            # but to return a zero vector (or, sometime in the future, a random vector??)
            if shape[0] == 0:
                filtered_embeds = np.zeros((1, shape[-1]))

            for method in methods:
                # if a pre-defined aggregation method is used, apply it
                if type(method) is str:
                    method_name = method
                    if method_name == 'median':
                        pooled = np.median(filtered_embeds, axis=0).reshape(-1)#.tolist()
                    elif method_name == 'mean':
                        pooled = filtered_embeds.mean(axis=0).reshape(-1) #.tolist()
                    elif method_name == 'last':
                        pooled = filtered_embeds[-1, :].reshape(-1) #.tolist()
                    elif method_name == 'first':
                        pooled = filtered_embeds[0, :].reshape(-1) #.tolist()
                    else:
                        raise ValueError(f'unknown pooling method identifier: {method}')
                # handle the case where a custom aggregation function is applied to the embeddings
                elif type(method) is tuple:
                    method_name, fn = method
                    pooled = fn(filtered_embeds).reshape(-1) #.tolist()

                else:
                    raise ValueError(method)

                model_pooled[method_name][layer] = pooled

        for method_name, layer_wise_reprs in model_pooled.items():
            all_pooled[model_name][method_name] = flatten_activations(layer_wise_reprs)
            all_pooled[model_name][method_name].index = [sentence.uid]

    return all_pooled

Functions

def download_embeddings(model_name='glove.840B.300d.txt')
Expand source code
def download_embeddings(model_name='glove.840B.300d.txt'):
    raise NotImplementedError
    if 'glove' in model_name:
        url = 'https://huggingface.co/stanfordnlp/glove/resolve/main/glove.840B.300d.zip'
        # download(url)
def flatten_activations(activations: Dict[int, ]) ‑> pandas.core.frame.DataFrame

Convert layer-wise activations into flattened dataframe format. Input: dict, key = layer, item = nd array of representations of that layer (n_tokens, ) Output: pd dataframe, MultiIndex (layer, unit)

Expand source code
def flatten_activations(activations: typing.Dict[int, np.array]) -> pd.DataFrame:
    """
    Convert layer-wise activations into flattened dataframe format.
    Input: dict, key = layer, item = nd array of representations of that layer (n_tokens, )
    Output: pd dataframe, MultiIndex (layer, unit)
    """
    labels = []
    arr_flat = []
    for layer, act_arr in activations.items():
        arr_flat.append(act_arr.reshape(1,-1))
        for i in range(act_arr.shape[0]): # across units
            labels.append(('representation', layer, i,))
    arr_flat = np.concatenate(arr_flat, axis=1) # concatenated activations across layers
    df = pd.DataFrame(arr_flat)
    df.columns = pd.MultiIndex.from_tuples(labels) # rows: stimuli, columns: units
    return df
def get_huggingface_embeds(sentence: Sentence, model_name: str = 'distilgpt2', layers: Collection[int] = None, dims: int = None) ‑> Dict[str, DefaultDict[int, list]]

Extracts [static] word embeddings for tokens in the given sentence

Args

sentence : Sentence
[description]
model_name : str, optional
[description]. Defaults to 'distilgpt2'.
layers : list[int], optional
a collection of layers to extract from the model. if None, all layers are extracted.
dims : int, optional
[description]. Defaults to None.

Raises

ValueError
[description]

Returns

typing.Dict[str, typing.DefaultDict[int, list]]
[description]
Expand source code
def get_huggingface_embeds(sentence: sentspace.Sentence.Sentence, 
                           model_name: str = 'distilgpt2',
                           layers: typing.Collection[int] = None,
                           dims: int = None) -> typing.Dict[str, typing.DefaultDict[int, list]]:
    """Extracts [static] word embeddings for tokens in the given sentence 

    Args:
        sentence (sentspace.Sentence.Sentence): [description]
        model_name (str, optional): [description]. Defaults to 'distilgpt2'.
        layers (list[int], optional): a collection of layers to extract from the model. if None, all layers are extracted.
        dims (int, optional): [description]. Defaults to None.

    Raises:
        ValueError: [description]

    Returns:
        typing.Dict[str, typing.DefaultDict[int, list]]: [description]
    """
    # layer -> [emb_t1 emb_t2 emb_t3 ...]
    representations = dict()

    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    model, tokenizer = load_huggingface(model_name, device=device)

    # we don't want to track gradients; only interested in the encoding
    model.eval()
    with torch.no_grad():

        # current procedure processes sentences individually. consider minibatching.
        batch_encoding = tokenizer(str(sentence), return_tensors="pt", truncation='longest_first').to(device)
        input_ids = batch_encoding['input_ids']
        
        # overflow_tokens = max(0, len(input_ids) - model.config.n_positions)
        # if overflow_tokens > 0: io.log(f"Stimulus too long! Truncated the first {overflow_tokens} tokens", type='WARN')
        
        # input_ids = input_ids[overflow_tokens:]
        # print(tokenizer.convert_ids_to_tokens(input_ids))

        output = model(input_ids, output_hidden_states=True, return_dict=True)
        hidden_states = output['hidden_states']

        #  for i in range(n_layer+1):
        for layer in range(len(hidden_states)):
            if layers is None or layer in layers:
                token = slice(None, None) # placeholder to allow a possibility of picking a particular token rather than the full sequence
                representations[layer] = hidden_states[layer].detach().cpu().squeeze().numpy()[token, :]

        # print(input_ids.shape, representations[0].shape)

    return {model_name: representations}
def get_word_embeds(sentence: Sentence, w2v: Dict[str, ], model_name: str = 'glove', dims: int = None) ‑> Dict[str, DefaultDict[None, list]]

Extracts [static] word embeddings for tokens in the given sentence

Args

sentence : [Sentence]
a Sentence object
w2v : [dict]
word embeddings dictionary as a mapping from token -> vector
model_name : str, optional
[description]. Defaults to 'glove'.
dims : int, optional
[description]. Defaults to 300.

Raises

ValueError
[description]

Returns

[type]
[description]
Expand source code
def get_word_embeds(sentence: sentspace.Sentence.Sentence, w2v: typing.Dict[str, np.array], 
                    model_name: str = 'glove', dims: int = None) -> typing.Dict[str, typing.DefaultDict[None, list]]:
    """Extracts [static] word embeddings for tokens in the given sentence 

    Args:
        sentence ([sentspace.Sentence.Sentence]): a Sentence object
        w2v ([dict]): word embeddings dictionary as a mapping from token -> vector
        model_name (str, optional): [description]. Defaults to 'glove'.
        dims (int, optional): [description]. Defaults to 300.

    Raises:
        ValueError: [description]

    Returns:
        [type]: [description]
    """
    # layer -> [emb_t1 emb_t2 emb_t3 ...]
    embeddings = defaultdict(list)
    dims = dims or next(iter(w2v[model_name].values())).shape[-1]

    # OOV_words = set()
    for token in sentence:
        if token in w2v:
            embeddings[0].append(w2v[token])
        else:
            embeddings[0].append(np.repeat(np.nan, dims))
            # sentence.OOV[model_name].add(token)
            # OOV_words.add(token)
    
    return {model_name: embeddings}
def pool_sentence_embeds(sentence, token_embeddings, filters={'nofilter': <function <lambda>>}, keys=None, methods={'mean', 'median'})

pools embeddings of an entire sentence (given as a list of embeddings) using averaging, maxpooling, minpooling, etc., after applying all the provided filters as functions (such as content words only).

Args

token_embeddings : list[np.array]
[description]

filters (list[function[(idx, token) -> bool]], optional): [description]. Defaults to [lambda x: True]. filters should be functions that map token to bool (e.g. is_content_word(…)) only tokens that satisfy all filters are retained. keys (typing.Union[typing.Collection[str], None]): which models we want to pool using the methods supplied. if None, all available models are pooled (separately) using the supplied methods methods (typing.Collection[typing.Union[str, typing.Tuple[str, typing.Callable]]]):

Returns

dict
averaging method -> averaged embedding
Expand source code
def pool_sentence_embeds(sentence, token_embeddings, filters={'nofilter': lambda i, x: True},
                         keys=None, methods={'mean', 'median'}):
    """pools embeddings of an entire sentence (given as a list of embeddings)
       using averaging, maxpooling, minpooling, etc., after applying all the
       provided filters as functions (such as content words only).

    Args:
        token_embeddings (list[np.array]): [description]
        filters (list[function[(idx, token) -> bool]], optional): [description]. Defaults to [lambda x: True].
            filters should be functions that map token to bool (e.g. is_content_word(...))
            only tokens that satisfy all filters are retained.
        keys (`typing.Union[typing.Collection[str], None]`): which models we want to pool using the methods supplied. 
            if None, all available models are pooled (separately) using the supplied methods
        methods (`typing.Collection[typing.Union[str, typing.Tuple[str, typing.Callable]]]`):

    Returns:
        dict: averaging method -> averaged embedding
    """                         

    """
    Return dataframe of each sentence no. and its sentence embedding
    from averaging embeddings of words in a sentence (ignore NAs)
    Parameters:
        df: dataframe, output of get_glove_word()
        content_only: if True, use content words only
        is_content_lst: list, values 1 if token is content word, 0 otherwise
        save: whether to save results
        save_path: path to save, support .csv & .mat files
    """
    
    # model -> method -> repr
    all_pooled = defaultdict(dict)

    for model_name in token_embeddings:

        # if the current model_name is not meant to be aggregated in this manner, skip
        # (e.g. BERT and last token "aggregation")
        # if keys is None, any aggregation step specified will be applied regardless of model_name
        # (e.g., mean)
        if keys and model_name not in keys: 
            continue

        # map from method --> layer (int) --> pooled representation per model
        # this entity still needs to be flattened
        model_pooled = defaultdict(dict)

        for layer in token_embeddings[model_name]:

            # all the embeddings corresponding to the tokens
            all_embeds = [e for i, (t, e) in enumerate(zip(sentence.tokens, token_embeddings[model_name][layer]))]
            all_tokens = [t for i, (t, e) in enumerate(zip(sentence.tokens, token_embeddings[model_name][layer]))]
            all_embeds = np.array(all_embeds, dtype=np.float32)
            all_tokens = np.array(all_tokens, dtype=str)
            # print(all_tokens, all_embeds.shape)

            # exclude OOV words' embeddings (they are all NaNs)
            not_nan_tokens = all_tokens[~np.isnan(all_embeds[:, 0])   ]
            not_nan_embeds = all_embeds[~np.isnan(all_embeds[:, 0]), :]

            # make a note of the shape of the vector (n x embed_dim)
            shape = not_nan_embeds.shape
            # TODO: vectorize operation on all tokensnow that it is an numpy array using apply()?
            if filters:
                mask = [all(fn(i, t) for fn_name, fn in filters.items()) for i, t in enumerate(not_nan_tokens)]
            else:
                mask = slice(None, None, None)

            filtered_embeds = not_nan_embeds[mask]
            filtered_shape = filtered_embeds.shape

            # if filtering left no tokens, we will use all_embeds instead
            if filtered_shape[0] == 0:
                io.log(f'filtered embeddings for current sentence are empty. retrying without filters: {sentence.tokens}', type='WARN')

                # now what? use unfiltered (as a fallback)
                filtered_embeds = not_nan_embeds
                
            # [very rarely] if no word has a corresponding embedding, then we have no choice
            # but to return a zero vector (or, sometime in the future, a random vector??)
            if shape[0] == 0:
                filtered_embeds = np.zeros((1, shape[-1]))

            for method in methods:
                # if a pre-defined aggregation method is used, apply it
                if type(method) is str:
                    method_name = method
                    if method_name == 'median':
                        pooled = np.median(filtered_embeds, axis=0).reshape(-1)#.tolist()
                    elif method_name == 'mean':
                        pooled = filtered_embeds.mean(axis=0).reshape(-1) #.tolist()
                    elif method_name == 'last':
                        pooled = filtered_embeds[-1, :].reshape(-1) #.tolist()
                    elif method_name == 'first':
                        pooled = filtered_embeds[0, :].reshape(-1) #.tolist()
                    else:
                        raise ValueError(f'unknown pooling method identifier: {method}')
                # handle the case where a custom aggregation function is applied to the embeddings
                elif type(method) is tuple:
                    method_name, fn = method
                    pooled = fn(filtered_embeds).reshape(-1) #.tolist()

                else:
                    raise ValueError(method)

                model_pooled[method_name][layer] = pooled

        for method_name, layer_wise_reprs in model_pooled.items():
            all_pooled[model_name][method_name] = flatten_activations(layer_wise_reprs)
            all_pooled[model_name][method_name].index = [sentence.uid]

    return all_pooled