Coverage for limitys/limitys.py: 20.87%
85 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-04 20:06:37 +00:00
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-04 20:06:37 +00:00
1"""Overlap (Finnish: limitys) assesses sentences from constrained and overlapping vocabularies."""
3import argparse
4import datetime as dti
5import json
6import logging
7import pathlib
8import sys
9from typing import Dict, Tuple, Union, no_type_check
11import yaml
12from gensim import downloader as model_api # type: ignore
13from gensim.corpora import Dictionary # type: ignore
14from gensim.models import TfidfModel # type: ignore
15from gensim.similarities import SparseTermSimilarityMatrix, WordEmbeddingSimilarityIndex # type: ignore
17from limitys import ENCODING, log, stop
19PathLike = Union[str, pathlib.Path]
20Documents = Dict[str, Dict[str, str]]
21Verification = Tuple[bool, str]
23MODELS = {
24 'news': 'word2vec-google-news-300',
25 'wiki': 'fasttext-wiki-news-subwords-300',
26}
27DEFAULT_MODEL = MODELS['news']
28DEFAULT_DOCUMENTS_NAME = 'documents.yml'
31def load_documents(path: PathLike = DEFAULT_DOCUMENTS_NAME) -> Documents:
32 """Load the keyed documents from the file per convention and format (from suffix)."""
33 source = pathlib.Path(path)
34 log.debug(f'- documents suffix is {source.suffix}')
36 if source.suffix.lower() in ('.yaml', '.yml'):
37 log.debug('- reading documents as yaml')
38 with open(source, 'rt', encoding=ENCODING) as handle:
39 return yaml.safe_load(handle) # type: ignore
41 if source.suffix.lower() in ('.json',):
42 log.debug('- reading documents as json')
43 with open(source, 'rt', encoding=ENCODING) as handle:
44 return json.load(handle) # type: ignore
46 return {}
49@no_type_check
50def similarity(options: argparse.Namespace) -> int:
51 """Drive the verification."""
52 documents = pathlib.Path(options.documents)
53 language = options.language
54 language_code = stop.language_code_of(language)
55 quiet = options.quiet
56 if quiet:
57 logging.getLogger().setLevel(logging.ERROR)
59 start_time = dti.datetime.now(tz=dti.timezone.utc)
60 log.info(f'starting similarity analysis of {language} (code {language_code}) documents in ({documents})')
61 log.info(f'output channel is {"STDOUT" if options.out_path is sys.stdout else options.out_path}')
62 docs = load_documents(documents)
63 log.info(f'- loaded {len(docs)} documents from ({documents})')
65 if not docs:
66 log.error('no documents to analyze')
67 return 1
69 sentences = {k: stop.cleanse(docs[k], stop.EN) for k in docs}
71 dictionary = Dictionary(list(sentences.values()))
72 bags_of_words = {k: dictionary.doc2bow(sentences[k]) for k in sentences}
74 tfidf = TfidfModel(list(bags_of_words.values()))
75 tfidfs = {k: tfidf[bags_of_words[k]] for k in bags_of_words}
77 model = model_api.load(MODELS['wiki'])
79 termsim_index = WordEmbeddingSimilarityIndex(model)
80 termsim_matrix = SparseTermSimilarityMatrix(termsim_index, dictionary, tfidf)
82 pbm = []
83 row_heads = []
84 col_heads = [key for key in tfidfs]
85 for row, (i, bag) in enumerate(tfidfs.items()):
86 pbm.append([])
87 row_heads.append(i)
88 for col, (j, bah) in enumerate(tfidfs.items()):
89 similarity = termsim_matrix.inner_product(bag, bah, normalized=(True, True))
90 pbm[row].append(similarity if col > row else None)
91 log.debug(f'{i=}, {j=}')
92 log.debug(docs[i])
93 log.debug(docs[j])
94 log.debug('similarity = %.4f' % similarity)
95 log.debug('# ' + '- ' * 42)
97 matrix_rep = ['# Matrix:', '']
98 matrix_rep.append(f'{" ".join(cell.rjust(12) for cell in col_heads[1:])}{" "*12}')
99 matrix_rep.append(f'{" ".join(("-"*11).rjust(12) for _ in row_heads[1:])}{" "*12}')
100 for rank, row in enumerate(pbm[:-1]):
101 upp_tri_mat_row = ' '.join(str('' if cell is None else round(cell, 3)).rjust(12) for cell in row[1:])
102 matrix_rep.append(f'{upp_tri_mat_row} | {row_heads[rank] :12s}')
103 if options.out_path is sys.stdout:
104 log.info('- writing similarity upper triangle matrix to STDOUT')
105 print('\n'.join(matrix_rep))
106 else:
107 out = pathlib.Path(options.out_path)
108 log.info(f'- writing similarity upper triangle matrix to {out}')
109 with open(out, 'wt', encoding=ENCODING) as handle:
110 handle.write('\n'.join(matrix_rep) + '\n')
112 end_time = dti.datetime.now(tz=dti.timezone.utc)
113 log.info(f'similarity analysis complete after {(end_time - start_time).total_seconds()} seconds')
114 return 0