Coverage for etiketti/implementation.py: 89.16%

197 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-04 17:54:24 +00:00

1"""Implementation API for labeling.""" 

2 

3import argparse 

4import datetime as dti 

5import pathlib 

6import shutil 

7import subprocess # nosec B404 

8import uuid 

9import warnings 

10from typing import no_type_check 

11 

12try: 

13 from liitos import APP_ALIAS as CREATOR_NAME, __version__ as CREATOR_VERSION # type: ignore 

14except ModuleNotFoundError: 

15 CREATOR_NAME = 'liitos' 

16 CREATOR_VERSION = '42' 

17 

18from etiketti.discover import ( 

19 extract_author, 

20 extract_meta_parts, 

21 get_producer, 

22 hash_file, 

23 load_conventions, 

24 load_label_context, 

25) 

26from etiketti import ( 

27 ENCODING, 

28 LOG_SEPARATOR, 

29 TS_FORMAT_PATCH, 

30 TS_FORMAT_PAYLOADS, 

31 TS_FORMAT_ISO, 

32 ContextType, 

33 ConventionsType, 

34 PathLike, 

35 log, 

36) 

37 

38warnings.filterwarnings('ignore') 

39 

40 

41@no_type_check 

42def log_subprocess_output(pipe, prefix: str): 

43 for line in iter(pipe.readline, b''): # b'\n'-separated lines 

44 cand = line.decode(encoding=ENCODING).rstrip() 

45 if cand.strip(): 45 ↛ 43line 45 didn't jump to line 43, because the condition on line 45 was never false

46 log.info(f'{prefix}: %s', cand) 

47 

48 

49def camelize_first_two(dashed_key: str) -> str: 

50 """Transform kebab-key-input into KebabKey.""" 

51 words = dashed_key.split('-') 

52 return f'{words[0].title()}{words[1].title()}' 

53 

54 

55@no_type_check 

56def timestamp_patch(create_date: str, modify_date: str, path: PathLike) -> None: 

57 """Let exiftool patch the time fields.""" 

58 # on linux the below twisted c->m and m->c mappings gives the correct timestamps 

59 exiftool_command = f'exiftool "-CreateDate={create_date}" "-ModifyDate={modify_date}" {path}' 

60 log.info(LOG_SEPARATOR) 

61 log.info(f'{exiftool_command} ...') 

62 process = subprocess.Popen( 

63 exiftool_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True # nosec B602 

64 ) 

65 with process.stdout: 

66 log_subprocess_output(process.stdout, 'timestamp-patch') 

67 return_code = process.wait() 

68 if return_code < 0: 68 ↛ 69line 68 didn't jump to line 69, because the condition on line 68 was never true

69 log.error(f'==> Timestamp patch process ({exiftool_command}) was terminated by signal {-return_code}') 

70 else: 

71 log.info(f'==> Timestamp patch process ({exiftool_command}) returned {return_code}') 

72 log.info(LOG_SEPARATOR) 

73 

74 

75@no_type_check 

76def pdf_attributes(path: PathLike) -> None: 

77 """Let exiftool assess the attributes.""" 

78 exiftool_command = f'exiftool {path}' 

79 log.info(f'{exiftool_command} ...') 

80 process = subprocess.Popen( 

81 exiftool_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True # nosec B602 

82 ) 

83 with process.stdout: 

84 log_subprocess_output(process.stdout, 'meta-state') 

85 return_code = process.wait() 

86 if return_code < 0: 86 ↛ 87line 86 didn't jump to line 87, because the condition on line 86 was never true

87 log.error(f'==> Meta-state process ({exiftool_command}) was terminated by signal {-return_code}') 

88 else: 

89 log.info(f'==> Meta-state process ({exiftool_command}) returned {return_code}') 

90 log.info(LOG_SEPARATOR) 

91 

92 

93@no_type_check 

94def pdf_info(path: PathLike) -> None: 

95 """Let pdfinfo assess some attributes.""" 

96 pdfinfo_command = f'pdfinfo {path}' 

97 log.info(f'{pdfinfo_command} ...') 

98 process = subprocess.Popen( 

99 pdfinfo_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True # nosec B602 

100 ) 

101 with process.stdout: 

102 log_subprocess_output(process.stdout, 'pdf-info') 

103 return_code = process.wait() 

104 if return_code < 0: 104 ↛ 105line 104 didn't jump to line 105, because the condition on line 104 was never true

105 log.error(f'==> Pdfinfo process ({pdfinfo_command}) was terminated by signal {-return_code}') 

106 else: 

107 log.info(f'==> Pdfinfo process ({pdfinfo_command}) returned {return_code}') 

108 log.info(LOG_SEPARATOR) 

109 

110 

111@no_type_check 

112def cross_correlate(source: PathLike, conventions: ConventionsType, context: ContextType, target: PathLike) -> None: 

113 """Load information per conventions and mix with source to create target pdf.""" 

114 from pikepdf import Pdf # Workaround to avoid the start-up log of backend in version calls etc. 

115 

116 source = pathlib.Path(source) 

117 target = pathlib.Path(target) 

118 backend_version = 'cf. pdf.Producer' # backend_version # 'putki 2023.1.1' 

119 log.info('Retrieving producer information:') 

120 log.info(LOG_SEPARATOR) 

121 producer_version = get_producer() 

122 creator_version = f'{CREATOR_NAME} {CREATOR_VERSION}' 

123 

124 author = extract_author(conventions['approvals-yml-path']) 

125 title, subject, keywords = extract_meta_parts(conventions['metadata-yml-path']) 

126 dc_subject = subject 

127 dc_creator = [author] 

128 dc_title = title 

129 keywords = keywords 

130 hashes = {f'{camelize_first_two(k)}Hash': f'sha512:{hash_file(v)}' for k, v in conventions.items() if '-tex-' in k} 

131 for k, v in hashes.items(): 

132 log.info(f'- {k :17s} -> {v}') 

133 

134 label_prefix = context['label']['prefix'] 

135 label_site_id = context['label']['site-id'] 

136 label_action_id = context['label']['action-id'] 

137 classification = 'Internal' 

138 content_bits = '0' 

139 enabled = 'true' 

140 method = 'Privileged' 

141 

142 st = source.stat() 

143 c_time_patch = dti.datetime.fromtimestamp(st.st_ctime, tz=dti.timezone.utc).strftime(TS_FORMAT_PATCH) 

144 

145 m_time_iso = dti.datetime.fromtimestamp(st.st_mtime, tz=dti.timezone.utc).strftime(TS_FORMAT_ISO) 

146 m_time_patch = dti.datetime.fromtimestamp(st.st_mtime, tz=dti.timezone.utc).strftime(TS_FORMAT_PATCH) 

147 

148 make_unique = str(uuid.uuid4()) 

149 

150 with Pdf.open(source) as pdf: 

151 with pdf.open_metadata(set_pikepdf_as_editor=False) as m: 

152 m.load_from_docinfo(pdf.docinfo) 

153 m[f'{label_prefix}Enabled'] = enabled 

154 m[f'{label_prefix}SetDate'] = m_time_iso # was iso 

155 m[f'{label_prefix}Method'] = method 

156 m[f'{label_prefix}Name'] = classification 

157 m[f'{label_prefix}SiteId'] = label_site_id 

158 m[f'{label_prefix}ActionId'] = label_action_id 

159 m[f'{label_prefix}ContentBits'] = content_bits 

160 m['Classification'] = classification 

161 m['UniqueIdentity'] = make_unique 

162 m['xmp:CreateDate'] = c_time_patch # was iso 

163 m['xmp:ModifyDate'] = m_time_patch # was iso 

164 m['xmp:CreatorTool'] = creator_version 

165 m['pdf:Producer'] = producer_version 

166 m['dc:subject'] = dc_subject 

167 m['dc:title'] = dc_title 

168 m['dc:creator'] = dc_creator 

169 # m['dc:source'] = dc_source 

170 # m['SourceHash'] = source_hash 

171 for k, v in hashes.items(): 

172 m[k] = v 

173 pdf.docinfo[f'/{label_prefix}Enabled'] = m[f'{label_prefix}Enabled'] 

174 pdf.docinfo[f'/{label_prefix}SetDate'] = m[f'{label_prefix}SetDate'] 

175 pdf.docinfo[f'/{label_prefix}Method'] = m[f'{label_prefix}Method'] 

176 pdf.docinfo[f'/{label_prefix}Name'] = m[f'{label_prefix}Name'] 

177 pdf.docinfo[f'/{label_prefix}SiteId'] = m[f'{label_prefix}SiteId'] 

178 pdf.docinfo[f'/{label_prefix}ActionId'] = m[f'{label_prefix}ActionId'] 

179 pdf.docinfo[f'/{label_prefix}ContentBits'] = m[f'{label_prefix}ContentBits'] 

180 pdf.docinfo['/Classification'] = m['Classification'] 

181 pdf.docinfo['/UniqueIdentity'] = m['UniqueIdentity'] 

182 # pdf.docinfo['/Source'] = m['dc:source'] 

183 # pdf.docinfo['/SourceHash'] = m['SourceHash'] 

184 for k, v in hashes.items(): 

185 pdf.docinfo[f'/{k}'] = v 

186 pdf.docinfo['/Author'] = m['dc:creator'][0] 

187 pdf.docinfo['/CreationDate'] = m['xmp:CreateDate'] 

188 pdf.docinfo['/Creator'] = m['xmp:CreatorTool'] 

189 pdf.docinfo['/Producer'] = m['pdf:Producer'] 

190 pdf.docinfo['/Keywords'] = keywords 

191 pdf.docinfo['/ModDate'] = m['xmp:ModifyDate'] 

192 pdf.docinfo['/PTEX.FullBanner'] = backend_version 

193 pdf.docinfo['/Subject'] = m['dc:subject'] 

194 pdf.docinfo['/Title'] = m['dc:title'] 

195 if context.get('kv_pairs'): 195 ↛ 196line 195 didn't jump to line 196, because the condition on line 195 was never true

196 for k, v in context['kv_pairs'].items(): 

197 pdf.docinfo[f'/Ctx{k}'] = v 

198 pdf.save(target, fix_metadata_version=False, linearize=True) 

199 

200 log.info(LOG_SEPARATOR) 

201 log.info('Patching the timestamps:') 

202 timestamp_patch(create_date=m_time_patch, modify_date=c_time_patch, path=target) 

203 

204 

205def patch(options: argparse.Namespace) -> int: 

206 """Patch the two related meta structures of the pdf file""" 

207 in_pdf = pathlib.Path(options.in_pdf) 

208 out_pdf = pathlib.Path(options.out_pdf) 

209 cfg_path = pathlib.Path(options.cfg_path) 

210 enforce = options.enforce 

211 log.info(f'Patching pdf meta data of {in_pdf}') 

212 log.info(f'Configuration path is {cfg_path}') 

213 log.info(f'Output path is {out_pdf}') 

214 if enforce: 214 ↛ 215line 214 didn't jump to line 215, because the condition on line 214 was never true

215 log.warning(f'Labeling will be enforced by overwriting {in_pdf}') 

216 else: 

217 log.info(f'Labeling will NOT be enforced - {in_pdf} will not be modified') 

218 

219 start_time = dti.datetime.now(tz=dti.timezone.utc) 

220 start_ts = start_time.strftime(TS_FORMAT_PAYLOADS) 

221 log.info(f'Start timestamp ({start_ts})') 

222 

223 log.info(LOG_SEPARATOR) 

224 context = load_label_context(cfg_path) 

225 log.info('loaded label context:') 

226 for k, v in context['label'].items(): 

227 log.info(f'- {k :17s} -> {v}') 

228 log.info(LOG_SEPARATOR) 

229 

230 if options.kv_pairs: 230 ↛ 231line 230 didn't jump to line 231, because the condition on line 230 was never true

231 log.info('key-value pairs from request for context kv_pairs:') 

232 context['kv_pairs'] = {} 

233 for k, v in options.kv_pairs.items(): 233 ↛ 234line 233 didn't jump to line 234, because the loop on line 233 never started

234 context['kv_pairs'][k] = v 

235 log.info(f'- {k :17s} -> {v}') 

236 log.info(LOG_SEPARATOR) 

237 

238 conventions = load_conventions(context, in_pdf) 

239 log.info('identified conventions:') 

240 for k, v in conventions.items(): 

241 log.info(f'- {k :17s} -> {v}') 

242 log.info(LOG_SEPARATOR) 

243 

244 log.info(LOG_SEPARATOR) 

245 log.info('PDF information from source file:') 

246 log.info(LOG_SEPARATOR) 

247 pdf_info(in_pdf) 

248 log.info('PDF attributes/labels from source file:') 

249 log.info(LOG_SEPARATOR) 

250 pdf_attributes(in_pdf) 

251 cross_correlate(source=in_pdf, conventions=conventions, context=context, target=out_pdf) 

252 log.info('PDF attributes/labels from target file:') 

253 log.info(LOG_SEPARATOR) 

254 pdf_attributes(out_pdf) 

255 log.info('PDF information from target file:') 

256 log.info(LOG_SEPARATOR) 

257 pdf_info(out_pdf) 

258 

259 if enforce: 259 ↛ 260line 259 didn't jump to line 260, because the condition on line 259 was never true

260 log.warning(f'Enforcing labels by overwriting {in_pdf}') 

261 shutil.copy2(out_pdf, in_pdf) 

262 log.info(f'- {out_pdf} -> {in_pdf}') 

263 

264 end_time = dti.datetime.now(tz=dti.timezone.utc) 

265 end_ts = end_time.strftime(TS_FORMAT_PAYLOADS) 

266 duration_secs = (end_time - start_time).total_seconds() 

267 log.info(f'End timestamp ({end_ts})') 

268 log.info(f'Patched {in_pdf} document and wrote {out_pdf} in {duration_secs} secs') 

269 

270 return 0