Module sentspace.utils.io
Expand source code
from genericpath import exists
import os
import pickle
import textwrap
import datetime # from datetime import date
from hashlib import sha1
from pathlib import Path
from sys import stderr, stdout
import pandas as pd
import numpy as np
import sentspace.utils
from sentspace.utils.caching import cache_to_disk, cache_to_mem
from sentspace.utils.s3 import load_feature
from tqdm import tqdm
from sentspace.Sentence import Sentence
# from sentspace.utils.sanity_checks import sanity_check_databases
def dump_features(): pass
def create_output_paths(input_file:str, output_dir:str, calling_module=None, stop_words_file:str=None) -> Path:
"""Creates an output directory structure to output the results of the pipeline based on the supplied input file
Args:
input_file (str): [description]
output_dir (str): [description]
calling_module ([type], optional): [description]. Defaults to None.
stop_words_file (str, optional): [description]. Defaults to None.
Returns:
Path: [description]
"""
output_dir = Path(output_dir).expanduser().resolve()
# output will be organized based on its respective input file,
# together with a hash of the contents (for to avoid the case where two files are
# named similarly to one another but contain different contents)
out_file_name = '.'.join(os.path.basename(input_file).split('.')[:-1])
output_dir /= out_file_name
# TODO: is this necessary? adds a lot of confusion.
# output_dir /= sentspace.utils.md5(input_file)
output_dir.mkdir(parents=True, exist_ok=True)
with (output_dir / 'md5.txt').open('w') as f:
f.write(f'{Path(input_file).resolve()} md5sum:\t' + sentspace.utils.md5(input_file))
with (output_dir / 'run_history.txt').open('a+') as f:
f.write(datetime.datetime.now().strftime('run: %Y-%m-%d %H:%M:%S %Z\n'))
output_dir /= calling_module or '' # to subdir it by "lexical" or "syntax" etc.
# output_dir /= date.today().strftime('run_%Y-%m-%d')
output_dir.mkdir(parents=True, exist_ok=True)
return output_dir
def read_sentences(filename: str, stop_words_file: str = None):
"""reads sentences from a file, one per line, filtering
for stopwords
Args:
filename (str): path to input file containing sentences
stop_words_file (str, optional): path to file containing stopwords
Returns:
list[list]: list of the tokens from each sentences, nested as a list
list: list of sentences
"""
if stop_words_file:
raise NotImplementedError()
stop_words = set(np.loadtxt(stop_words_file, delimiter='\t', unpack=False, dtype=str))
if filename.endswith('.txt'):
UIDs = []
sentences = []
with open(filename, 'r') as f:
UID_prefix = f'{filename[-8:]:#>10}' + '_' + sentspace.utils.md5(filename)[-5:]
for i, line in enumerate(f):
uid = UID_prefix + '_' + f'{len(UIDs):0>5}'
UIDs += [uid]
s = Sentence(line, uid)
if s:
sentences.append(s)
return sentences
elif filename.endswith('.pkl'):
df = pd.read_pickle(filename)
elif filename.endswith('.csv'):
df = pd.read_csv(filename, sep=',')
elif filename.endswith('.tsv'):
df = pd.read_csv(filename, sep='\t')
else:
raise ValueError('unknown type of file supplied (must be txt/pkl/csv/tsv. if pickle, must be a dataframe object)')
# try to figure out what to use as a unique identifier for the sentence
if 'corpora_identifier' in df.columns:
UIDs = df['corpora_identifier'].tolist()
elif 'index' in df.columns:
UIDs = df['index'].tolist()
else:
try:
UIDs = df.index.tolist()
except AttributeError:
raise('does your dataframe have a unique index for each sentence?')
sentences = [Sentence(raw, uid) for raw, uid in zip(df['sentence'].tolist(), UIDs)]
return sentences
def get_batches(iterable, batch_size:int,
limit:float=float('inf'), offset:int=0):
"""
splits iterable into batches of size batch_size
"""
batch = []
count = 0
for i, item in enumerate(iterable):
# skip till we hit the offset
if i < offset: continue
batch.append(item)
count += 1
# batch size reached, or limit reached
if (i + 1) % batch_size == 0 or count >= limit:
yield batch
batch = []
# if we have accumulated enough items, return
if count >= limit:
return
if batch:
yield batch
def log(message, type='INFO'):
class T:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
if type == 'INFO':
c = T.OKCYAN
elif type == 'EMPH':
c = T.OKGREEN
elif type == 'WARN':
c = T.BOLD + T.WARNING
elif type == 'ERR':
c = '\n' + T.BOLD + T.FAIL
else:
c = T.OKBLUE
timestamp = f'{sentspace.utils.time() - sentspace.utils.START_TIME():.2f}s'
lines = textwrap.wrap(message+T.ENDC,
width=120,
initial_indent = c + '%'*4 + f' [{type} @ {timestamp}] ',
subsequent_indent='.'*20+' ')
tqdm.write('\n'.join(lines), file=stderr)
# print(*lines, sep='\n', file=stderr)
Functions
def create_output_paths(input_file: str, output_dir: str, calling_module=None, stop_words_file: str = None) ‑> pathlib.Path
-
Creates an output directory structure to output the results of the pipeline based on the supplied input file
Args
input_file
:str
- [description]
output_dir
:str
- [description]
calling_module
:[type]
, optional- [description]. Defaults to None.
stop_words_file
:str
, optional- [description]. Defaults to None.
Returns
Path
- [description]
Expand source code
def create_output_paths(input_file:str, output_dir:str, calling_module=None, stop_words_file:str=None) -> Path: """Creates an output directory structure to output the results of the pipeline based on the supplied input file Args: input_file (str): [description] output_dir (str): [description] calling_module ([type], optional): [description]. Defaults to None. stop_words_file (str, optional): [description]. Defaults to None. Returns: Path: [description] """ output_dir = Path(output_dir).expanduser().resolve() # output will be organized based on its respective input file, # together with a hash of the contents (for to avoid the case where two files are # named similarly to one another but contain different contents) out_file_name = '.'.join(os.path.basename(input_file).split('.')[:-1]) output_dir /= out_file_name # TODO: is this necessary? adds a lot of confusion. # output_dir /= sentspace.utils.md5(input_file) output_dir.mkdir(parents=True, exist_ok=True) with (output_dir / 'md5.txt').open('w') as f: f.write(f'{Path(input_file).resolve()} md5sum:\t' + sentspace.utils.md5(input_file)) with (output_dir / 'run_history.txt').open('a+') as f: f.write(datetime.datetime.now().strftime('run: %Y-%m-%d %H:%M:%S %Z\n')) output_dir /= calling_module or '' # to subdir it by "lexical" or "syntax" etc. # output_dir /= date.today().strftime('run_%Y-%m-%d') output_dir.mkdir(parents=True, exist_ok=True) return output_dir
def dump_features()
-
Expand source code
def dump_features(): pass
def get_batches(iterable, batch_size: int, limit: float = inf, offset: int = 0)
-
splits iterable into batches of size batch_size
Expand source code
def get_batches(iterable, batch_size:int, limit:float=float('inf'), offset:int=0): """ splits iterable into batches of size batch_size """ batch = [] count = 0 for i, item in enumerate(iterable): # skip till we hit the offset if i < offset: continue batch.append(item) count += 1 # batch size reached, or limit reached if (i + 1) % batch_size == 0 or count >= limit: yield batch batch = [] # if we have accumulated enough items, return if count >= limit: return if batch: yield batch
def log(message, type='INFO')
-
Expand source code
def log(message, type='INFO'): class T: HEADER = '\033[95m' OKBLUE = '\033[94m' OKCYAN = '\033[96m' OKGREEN = '\033[92m' WARNING = '\033[93m' FAIL = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' UNDERLINE = '\033[4m' if type == 'INFO': c = T.OKCYAN elif type == 'EMPH': c = T.OKGREEN elif type == 'WARN': c = T.BOLD + T.WARNING elif type == 'ERR': c = '\n' + T.BOLD + T.FAIL else: c = T.OKBLUE timestamp = f'{sentspace.utils.time() - sentspace.utils.START_TIME():.2f}s' lines = textwrap.wrap(message+T.ENDC, width=120, initial_indent = c + '%'*4 + f' [{type} @ {timestamp}] ', subsequent_indent='.'*20+' ') tqdm.write('\n'.join(lines), file=stderr) # print(*lines, sep='\n', file=stderr)
def read_sentences(filename: str, stop_words_file: str = None)
-
reads sentences from a file, one per line, filtering for stopwords
Args
filename
:str
- path to input file containing sentences
stop_words_file
:str
, optional- path to file containing stopwords
Returns
list[list]
- list of the tokens from each sentences, nested as a list
list
- list of sentences
Expand source code
def read_sentences(filename: str, stop_words_file: str = None): """reads sentences from a file, one per line, filtering for stopwords Args: filename (str): path to input file containing sentences stop_words_file (str, optional): path to file containing stopwords Returns: list[list]: list of the tokens from each sentences, nested as a list list: list of sentences """ if stop_words_file: raise NotImplementedError() stop_words = set(np.loadtxt(stop_words_file, delimiter='\t', unpack=False, dtype=str)) if filename.endswith('.txt'): UIDs = [] sentences = [] with open(filename, 'r') as f: UID_prefix = f'{filename[-8:]:#>10}' + '_' + sentspace.utils.md5(filename)[-5:] for i, line in enumerate(f): uid = UID_prefix + '_' + f'{len(UIDs):0>5}' UIDs += [uid] s = Sentence(line, uid) if s: sentences.append(s) return sentences elif filename.endswith('.pkl'): df = pd.read_pickle(filename) elif filename.endswith('.csv'): df = pd.read_csv(filename, sep=',') elif filename.endswith('.tsv'): df = pd.read_csv(filename, sep='\t') else: raise ValueError('unknown type of file supplied (must be txt/pkl/csv/tsv. if pickle, must be a dataframe object)') # try to figure out what to use as a unique identifier for the sentence if 'corpora_identifier' in df.columns: UIDs = df['corpora_identifier'].tolist() elif 'index' in df.columns: UIDs = df['index'].tolist() else: try: UIDs = df.index.tolist() except AttributeError: raise('does your dataframe have a unique index for each sentence?') sentences = [Sentence(raw, uid) for raw, uid in zip(df['sentence'].tolist(), UIDs)] return sentences