Module sentspace.utils.misc
Expand source code
import concurrent.futures
import hashlib
from functools import partial
from itertools import chain
from time import time
# import seaborn as sns
import nltk
# import matplotlib.pyplot as plt
import numpy as np
import scipy.io as sio
from tqdm import tqdm
_START_TIME = time()
def START_TIME():
return _START_TIME
# download NLTK data if not already downloaded
def download_nltk_resources():
for category, nltk_resource in [
("taggers", "averaged_perceptron_tagger"),
("corpora", "wordnet"),
("tokenizers", "punkt"),
]:
try:
nltk.data.find(category + "/" + nltk_resource)
except LookupError as e:
try:
nltk.download(nltk_resource)
except FileExistsError:
pass
def md5(fname_or_raw, raw=False) -> str:
"""generates md5sum of the contents of fname
fname (str): path to file whose md5sum we want
"""
hash_md5 = hashlib.md5()
if raw:
chunk = fname_or_raw.encode("utf-8")
hash_md5.update(chunk)
return hash_md5.hexdigest()
with open(fname_or_raw, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def sha1(ob):
ob_repr = repr(ob)
hash_object = hashlib.sha1()
hash_object.update(ob_repr.encode("utf-8"))
return hash_object.hexdigest()
def parallelize(
function, *iterables, wrap_tqdm=True, desc="", max_workers=None, **kwargs
):
"""parallelizes a function by calling it on the supplied iterables and (static) kwargs.
optionally wraps in tqdm for progress visualization
Args:
function ([type]): [description]
wrap_tqdm (bool, optional): [description]. Defaults to True.
desc ([type], optional): [description]. Defaults to None.
Returns:
[type]: [description]
"""
partialfn = partial(function, **kwargs)
with concurrent.futures.ProcessPoolExecutor(max_workers=None) as executor:
if wrap_tqdm:
return [
*tqdm(
executor.map(partialfn, *iterables),
total=len(iterables[0]),
desc="[parallelized] " + desc,
)
]
return executor.map(partialfn, *iterables)
# this might be data-dependent
def load_passage_labels(filename):
"""
Given .mat file, load and return list of passage no. for each sentence
"""
labelsPassages = sio.loadmat(filename)
lP = labelsPassages["labelsPassageForEachSentence"]
return lP.flatten()
# this might be data-dependent
def load_passage_categories(filename):
"""
Given .mat file, load and return list of passage category labels
"""
labelsPassages = sio.loadmat(filename)
lP = labelsPassages["keyPassageCategory"]
return list(np.hstack(lP[0]))
def get_passage_labels(f1g, lplst):
"""
Given list of passage no. for each sentence, return list of passage no. (for each word)
"""
lplst_word = []
for i, sentence in enumerate(f1g):
for word in sentence:
lplst_word.append(lplst[i])
return lplst_word
# this might be data-dependent
def load_passage_category(filename):
"""
Given .mat file, return category no. for each passage
"""
labelsPassageCategory = sio.loadmat(filename)
lPC = labelsPassageCategory["labelsPassageCategory"]
lPC = np.hsplit(lPC, 1)
lpclst = np.array(lPC).tolist()
lpclst = lpclst[0]
lpclst = list(chain.from_iterable(lpclst)) # Accessing the nested lists
return lpclst
def merge_lists(list_a, list_b, feature=""):
"""Input: Two lists with potentially missing values.
Return: If list 1 contains NA vals, the NA val is replaced by the value in list 2 (either numerical val or np.nan again)
"""
merged = []
for val1, val2 in zip(list_a, list_b):
merged += [val2 if np.isnan(val1) else val1]
return merged
def sizeof_fmt(num, suffix="B"):
"""
This function can be used to print out how big a file is
"""
""" by Fred Cirera, https://stackoverflow.com/a/1094933/1870254, modified"""
for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]:
if abs(num) < 1024.0:
return "%3.1f %s%s" % (num, unit, suffix)
num /= 1024.0
return "%.1f %s%s" % (num, "Yi", suffix)
Functions
def START_TIME()
-
Expand source code
def START_TIME(): return _START_TIME
def download_nltk_resources()
-
Expand source code
def download_nltk_resources(): for category, nltk_resource in [ ("taggers", "averaged_perceptron_tagger"), ("corpora", "wordnet"), ("tokenizers", "punkt"), ]: try: nltk.data.find(category + "/" + nltk_resource) except LookupError as e: try: nltk.download(nltk_resource) except FileExistsError: pass
def get_passage_labels(f1g, lplst)
-
Given list of passage no. for each sentence, return list of passage no. (for each word)
Expand source code
def get_passage_labels(f1g, lplst): """ Given list of passage no. for each sentence, return list of passage no. (for each word) """ lplst_word = [] for i, sentence in enumerate(f1g): for word in sentence: lplst_word.append(lplst[i]) return lplst_word
def load_passage_categories(filename)
-
Given .mat file, load and return list of passage category labels
Expand source code
def load_passage_categories(filename): """ Given .mat file, load and return list of passage category labels """ labelsPassages = sio.loadmat(filename) lP = labelsPassages["keyPassageCategory"] return list(np.hstack(lP[0]))
def load_passage_category(filename)
-
Given .mat file, return category no. for each passage
Expand source code
def load_passage_category(filename): """ Given .mat file, return category no. for each passage """ labelsPassageCategory = sio.loadmat(filename) lPC = labelsPassageCategory["labelsPassageCategory"] lPC = np.hsplit(lPC, 1) lpclst = np.array(lPC).tolist() lpclst = lpclst[0] lpclst = list(chain.from_iterable(lpclst)) # Accessing the nested lists return lpclst
def load_passage_labels(filename)
-
Given .mat file, load and return list of passage no. for each sentence
Expand source code
def load_passage_labels(filename): """ Given .mat file, load and return list of passage no. for each sentence """ labelsPassages = sio.loadmat(filename) lP = labelsPassages["labelsPassageForEachSentence"] return lP.flatten()
def md5(fname_or_raw, raw=False) ‑> str
-
generates md5sum of the contents of fname fname (str): path to file whose md5sum we want
Expand source code
def md5(fname_or_raw, raw=False) -> str: """generates md5sum of the contents of fname fname (str): path to file whose md5sum we want """ hash_md5 = hashlib.md5() if raw: chunk = fname_or_raw.encode("utf-8") hash_md5.update(chunk) return hash_md5.hexdigest() with open(fname_or_raw, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) return hash_md5.hexdigest()
def merge_lists(list_a, list_b, feature='')
-
Input: Two lists with potentially missing values. Return: If list 1 contains NA vals, the NA val is replaced by the value in list 2 (either numerical val or np.nan again)
Expand source code
def merge_lists(list_a, list_b, feature=""): """Input: Two lists with potentially missing values. Return: If list 1 contains NA vals, the NA val is replaced by the value in list 2 (either numerical val or np.nan again) """ merged = [] for val1, val2 in zip(list_a, list_b): merged += [val2 if np.isnan(val1) else val1] return merged
def parallelize(function, *iterables, wrap_tqdm=True, desc='', max_workers=None, **kwargs)
-
parallelizes a function by calling it on the supplied iterables and (static) kwargs. optionally wraps in tqdm for progress visualization
Args
function
:[type]
- [description]
wrap_tqdm
:bool
, optional- [description]. Defaults to True.
desc
:[type]
, optional- [description]. Defaults to None.
Returns
[type]
- [description]
Expand source code
def parallelize( function, *iterables, wrap_tqdm=True, desc="", max_workers=None, **kwargs ): """parallelizes a function by calling it on the supplied iterables and (static) kwargs. optionally wraps in tqdm for progress visualization Args: function ([type]): [description] wrap_tqdm (bool, optional): [description]. Defaults to True. desc ([type], optional): [description]. Defaults to None. Returns: [type]: [description] """ partialfn = partial(function, **kwargs) with concurrent.futures.ProcessPoolExecutor(max_workers=None) as executor: if wrap_tqdm: return [ *tqdm( executor.map(partialfn, *iterables), total=len(iterables[0]), desc="[parallelized] " + desc, ) ] return executor.map(partialfn, *iterables)
def sha1(ob)
-
Expand source code
def sha1(ob): ob_repr = repr(ob) hash_object = hashlib.sha1() hash_object.update(ob_repr.encode("utf-8")) return hash_object.hexdigest()
def sizeof_fmt(num, suffix='B')
-
This function can be used to print out how big a file is
Expand source code
def sizeof_fmt(num, suffix="B"): """ This function can be used to print out how big a file is """ """ by Fred Cirera, https://stackoverflow.com/a/1094933/1870254, modified""" for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]: if abs(num) < 1024.0: return "%3.1f %s%s" % (num, unit, suffix) num /= 1024.0 return "%.1f %s%s" % (num, "Yi", suffix)