Coverage for limitys/limitys.py: 20.87%

85 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-04 20:06:37 +00:00

1"""Overlap (Finnish: limitys) assesses sentences from constrained and overlapping vocabularies.""" 

2 

3import argparse 

4import datetime as dti 

5import json 

6import logging 

7import pathlib 

8import sys 

9from typing import Dict, Tuple, Union, no_type_check 

10 

11import yaml 

12from gensim import downloader as model_api # type: ignore 

13from gensim.corpora import Dictionary # type: ignore 

14from gensim.models import TfidfModel # type: ignore 

15from gensim.similarities import SparseTermSimilarityMatrix, WordEmbeddingSimilarityIndex # type: ignore 

16 

17from limitys import ENCODING, log, stop 

18 

19PathLike = Union[str, pathlib.Path] 

20Documents = Dict[str, Dict[str, str]] 

21Verification = Tuple[bool, str] 

22 

23MODELS = { 

24 'news': 'word2vec-google-news-300', 

25 'wiki': 'fasttext-wiki-news-subwords-300', 

26} 

27DEFAULT_MODEL = MODELS['news'] 

28DEFAULT_DOCUMENTS_NAME = 'documents.yml' 

29 

30 

31def load_documents(path: PathLike = DEFAULT_DOCUMENTS_NAME) -> Documents: 

32 """Load the keyed documents from the file per convention and format (from suffix).""" 

33 source = pathlib.Path(path) 

34 log.debug(f'- documents suffix is {source.suffix}') 

35 

36 if source.suffix.lower() in ('.yaml', '.yml'): 

37 log.debug('- reading documents as yaml') 

38 with open(source, 'rt', encoding=ENCODING) as handle: 

39 return yaml.safe_load(handle) # type: ignore 

40 

41 if source.suffix.lower() in ('.json',): 

42 log.debug('- reading documents as json') 

43 with open(source, 'rt', encoding=ENCODING) as handle: 

44 return json.load(handle) # type: ignore 

45 

46 return {} 

47 

48 

49@no_type_check 

50def similarity(options: argparse.Namespace) -> int: 

51 """Drive the verification.""" 

52 documents = pathlib.Path(options.documents) 

53 language = options.language 

54 language_code = stop.language_code_of(language) 

55 quiet = options.quiet 

56 if quiet: 

57 logging.getLogger().setLevel(logging.ERROR) 

58 

59 start_time = dti.datetime.now(tz=dti.timezone.utc) 

60 log.info(f'starting similarity analysis of {language} (code {language_code}) documents in ({documents})') 

61 log.info(f'output channel is {"STDOUT" if options.out_path is sys.stdout else options.out_path}') 

62 docs = load_documents(documents) 

63 log.info(f'- loaded {len(docs)} documents from ({documents})') 

64 

65 if not docs: 

66 log.error('no documents to analyze') 

67 return 1 

68 

69 sentences = {k: stop.cleanse(docs[k], stop.EN) for k in docs} 

70 

71 dictionary = Dictionary(list(sentences.values())) 

72 bags_of_words = {k: dictionary.doc2bow(sentences[k]) for k in sentences} 

73 

74 tfidf = TfidfModel(list(bags_of_words.values())) 

75 tfidfs = {k: tfidf[bags_of_words[k]] for k in bags_of_words} 

76 

77 model = model_api.load(MODELS['wiki']) 

78 

79 termsim_index = WordEmbeddingSimilarityIndex(model) 

80 termsim_matrix = SparseTermSimilarityMatrix(termsim_index, dictionary, tfidf) 

81 

82 pbm = [] 

83 row_heads = [] 

84 col_heads = [key for key in tfidfs] 

85 for row, (i, bag) in enumerate(tfidfs.items()): 

86 pbm.append([]) 

87 row_heads.append(i) 

88 for col, (j, bah) in enumerate(tfidfs.items()): 

89 similarity = termsim_matrix.inner_product(bag, bah, normalized=(True, True)) 

90 pbm[row].append(similarity if col > row else None) 

91 log.debug(f'{i=}, {j=}') 

92 log.debug(docs[i]) 

93 log.debug(docs[j]) 

94 log.debug('similarity = %.4f' % similarity) 

95 log.debug('# ' + '- ' * 42) 

96 

97 matrix_rep = ['# Matrix:', ''] 

98 matrix_rep.append(f'{" ".join(cell.rjust(12) for cell in col_heads[1:])}{" "*12}') 

99 matrix_rep.append(f'{" ".join(("-"*11).rjust(12) for _ in row_heads[1:])}{" "*12}') 

100 for rank, row in enumerate(pbm[:-1]): 

101 upp_tri_mat_row = ' '.join(str('' if cell is None else round(cell, 3)).rjust(12) for cell in row[1:]) 

102 matrix_rep.append(f'{upp_tri_mat_row} | {row_heads[rank] :12s}') 

103 if options.out_path is sys.stdout: 

104 log.info('- writing similarity upper triangle matrix to STDOUT') 

105 print('\n'.join(matrix_rep)) 

106 else: 

107 out = pathlib.Path(options.out_path) 

108 log.info(f'- writing similarity upper triangle matrix to {out}') 

109 with open(out, 'wt', encoding=ENCODING) as handle: 

110 handle.write('\n'.join(matrix_rep) + '\n') 

111 

112 end_time = dti.datetime.now(tz=dti.timezone.utc) 

113 log.info(f'similarity analysis complete after {(end_time - start_time).total_seconds()} seconds') 

114 return 0