Coverage for etiketti/implementation.py: 89.16%
197 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-04 17:54:24 +00:00
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-04 17:54:24 +00:00
1"""Implementation API for labeling."""
3import argparse
4import datetime as dti
5import pathlib
6import shutil
7import subprocess # nosec B404
8import uuid
9import warnings
10from typing import no_type_check
12try:
13 from liitos import APP_ALIAS as CREATOR_NAME, __version__ as CREATOR_VERSION # type: ignore
14except ModuleNotFoundError:
15 CREATOR_NAME = 'liitos'
16 CREATOR_VERSION = '42'
18from etiketti.discover import (
19 extract_author,
20 extract_meta_parts,
21 get_producer,
22 hash_file,
23 load_conventions,
24 load_label_context,
25)
26from etiketti import (
27 ENCODING,
28 LOG_SEPARATOR,
29 TS_FORMAT_PATCH,
30 TS_FORMAT_PAYLOADS,
31 TS_FORMAT_ISO,
32 ContextType,
33 ConventionsType,
34 PathLike,
35 log,
36)
38warnings.filterwarnings('ignore')
41@no_type_check
42def log_subprocess_output(pipe, prefix: str):
43 for line in iter(pipe.readline, b''): # b'\n'-separated lines
44 cand = line.decode(encoding=ENCODING).rstrip()
45 if cand.strip(): 45 ↛ 43line 45 didn't jump to line 43, because the condition on line 45 was never false
46 log.info(f'{prefix}: %s', cand)
49def camelize_first_two(dashed_key: str) -> str:
50 """Transform kebab-key-input into KebabKey."""
51 words = dashed_key.split('-')
52 return f'{words[0].title()}{words[1].title()}'
55@no_type_check
56def timestamp_patch(create_date: str, modify_date: str, path: PathLike) -> None:
57 """Let exiftool patch the time fields."""
58 # on linux the below twisted c->m and m->c mappings gives the correct timestamps
59 exiftool_command = f'exiftool "-CreateDate={create_date}" "-ModifyDate={modify_date}" {path}'
60 log.info(LOG_SEPARATOR)
61 log.info(f'{exiftool_command} ...')
62 process = subprocess.Popen(
63 exiftool_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True # nosec B602
64 )
65 with process.stdout:
66 log_subprocess_output(process.stdout, 'timestamp-patch')
67 return_code = process.wait()
68 if return_code < 0: 68 ↛ 69line 68 didn't jump to line 69, because the condition on line 68 was never true
69 log.error(f'==> Timestamp patch process ({exiftool_command}) was terminated by signal {-return_code}')
70 else:
71 log.info(f'==> Timestamp patch process ({exiftool_command}) returned {return_code}')
72 log.info(LOG_SEPARATOR)
75@no_type_check
76def pdf_attributes(path: PathLike) -> None:
77 """Let exiftool assess the attributes."""
78 exiftool_command = f'exiftool {path}'
79 log.info(f'{exiftool_command} ...')
80 process = subprocess.Popen(
81 exiftool_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True # nosec B602
82 )
83 with process.stdout:
84 log_subprocess_output(process.stdout, 'meta-state')
85 return_code = process.wait()
86 if return_code < 0: 86 ↛ 87line 86 didn't jump to line 87, because the condition on line 86 was never true
87 log.error(f'==> Meta-state process ({exiftool_command}) was terminated by signal {-return_code}')
88 else:
89 log.info(f'==> Meta-state process ({exiftool_command}) returned {return_code}')
90 log.info(LOG_SEPARATOR)
93@no_type_check
94def pdf_info(path: PathLike) -> None:
95 """Let pdfinfo assess some attributes."""
96 pdfinfo_command = f'pdfinfo {path}'
97 log.info(f'{pdfinfo_command} ...')
98 process = subprocess.Popen(
99 pdfinfo_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True # nosec B602
100 )
101 with process.stdout:
102 log_subprocess_output(process.stdout, 'pdf-info')
103 return_code = process.wait()
104 if return_code < 0: 104 ↛ 105line 104 didn't jump to line 105, because the condition on line 104 was never true
105 log.error(f'==> Pdfinfo process ({pdfinfo_command}) was terminated by signal {-return_code}')
106 else:
107 log.info(f'==> Pdfinfo process ({pdfinfo_command}) returned {return_code}')
108 log.info(LOG_SEPARATOR)
111@no_type_check
112def cross_correlate(source: PathLike, conventions: ConventionsType, context: ContextType, target: PathLike) -> None:
113 """Load information per conventions and mix with source to create target pdf."""
114 from pikepdf import Pdf # Workaround to avoid the start-up log of backend in version calls etc.
116 source = pathlib.Path(source)
117 target = pathlib.Path(target)
118 backend_version = 'cf. pdf.Producer' # backend_version # 'putki 2023.1.1'
119 log.info('Retrieving producer information:')
120 log.info(LOG_SEPARATOR)
121 producer_version = get_producer()
122 creator_version = f'{CREATOR_NAME} {CREATOR_VERSION}'
124 author = extract_author(conventions['approvals-yml-path'])
125 title, subject, keywords = extract_meta_parts(conventions['metadata-yml-path'])
126 dc_subject = subject
127 dc_creator = [author]
128 dc_title = title
129 keywords = keywords
130 hashes = {f'{camelize_first_two(k)}Hash': f'sha512:{hash_file(v)}' for k, v in conventions.items() if '-tex-' in k}
131 for k, v in hashes.items():
132 log.info(f'- {k :17s} -> {v}')
134 label_prefix = context['label']['prefix']
135 label_site_id = context['label']['site-id']
136 label_action_id = context['label']['action-id']
137 classification = 'Internal'
138 content_bits = '0'
139 enabled = 'true'
140 method = 'Privileged'
142 st = source.stat()
143 c_time_patch = dti.datetime.fromtimestamp(st.st_ctime, tz=dti.timezone.utc).strftime(TS_FORMAT_PATCH)
145 m_time_iso = dti.datetime.fromtimestamp(st.st_mtime, tz=dti.timezone.utc).strftime(TS_FORMAT_ISO)
146 m_time_patch = dti.datetime.fromtimestamp(st.st_mtime, tz=dti.timezone.utc).strftime(TS_FORMAT_PATCH)
148 make_unique = str(uuid.uuid4())
150 with Pdf.open(source) as pdf:
151 with pdf.open_metadata(set_pikepdf_as_editor=False) as m:
152 m.load_from_docinfo(pdf.docinfo)
153 m[f'{label_prefix}Enabled'] = enabled
154 m[f'{label_prefix}SetDate'] = m_time_iso # was iso
155 m[f'{label_prefix}Method'] = method
156 m[f'{label_prefix}Name'] = classification
157 m[f'{label_prefix}SiteId'] = label_site_id
158 m[f'{label_prefix}ActionId'] = label_action_id
159 m[f'{label_prefix}ContentBits'] = content_bits
160 m['Classification'] = classification
161 m['UniqueIdentity'] = make_unique
162 m['xmp:CreateDate'] = c_time_patch # was iso
163 m['xmp:ModifyDate'] = m_time_patch # was iso
164 m['xmp:CreatorTool'] = creator_version
165 m['pdf:Producer'] = producer_version
166 m['dc:subject'] = dc_subject
167 m['dc:title'] = dc_title
168 m['dc:creator'] = dc_creator
169 # m['dc:source'] = dc_source
170 # m['SourceHash'] = source_hash
171 for k, v in hashes.items():
172 m[k] = v
173 pdf.docinfo[f'/{label_prefix}Enabled'] = m[f'{label_prefix}Enabled']
174 pdf.docinfo[f'/{label_prefix}SetDate'] = m[f'{label_prefix}SetDate']
175 pdf.docinfo[f'/{label_prefix}Method'] = m[f'{label_prefix}Method']
176 pdf.docinfo[f'/{label_prefix}Name'] = m[f'{label_prefix}Name']
177 pdf.docinfo[f'/{label_prefix}SiteId'] = m[f'{label_prefix}SiteId']
178 pdf.docinfo[f'/{label_prefix}ActionId'] = m[f'{label_prefix}ActionId']
179 pdf.docinfo[f'/{label_prefix}ContentBits'] = m[f'{label_prefix}ContentBits']
180 pdf.docinfo['/Classification'] = m['Classification']
181 pdf.docinfo['/UniqueIdentity'] = m['UniqueIdentity']
182 # pdf.docinfo['/Source'] = m['dc:source']
183 # pdf.docinfo['/SourceHash'] = m['SourceHash']
184 for k, v in hashes.items():
185 pdf.docinfo[f'/{k}'] = v
186 pdf.docinfo['/Author'] = m['dc:creator'][0]
187 pdf.docinfo['/CreationDate'] = m['xmp:CreateDate']
188 pdf.docinfo['/Creator'] = m['xmp:CreatorTool']
189 pdf.docinfo['/Producer'] = m['pdf:Producer']
190 pdf.docinfo['/Keywords'] = keywords
191 pdf.docinfo['/ModDate'] = m['xmp:ModifyDate']
192 pdf.docinfo['/PTEX.FullBanner'] = backend_version
193 pdf.docinfo['/Subject'] = m['dc:subject']
194 pdf.docinfo['/Title'] = m['dc:title']
195 if context.get('kv_pairs'): 195 ↛ 196line 195 didn't jump to line 196, because the condition on line 195 was never true
196 for k, v in context['kv_pairs'].items():
197 pdf.docinfo[f'/Ctx{k}'] = v
198 pdf.save(target, fix_metadata_version=False, linearize=True)
200 log.info(LOG_SEPARATOR)
201 log.info('Patching the timestamps:')
202 timestamp_patch(create_date=m_time_patch, modify_date=c_time_patch, path=target)
205def patch(options: argparse.Namespace) -> int:
206 """Patch the two related meta structures of the pdf file"""
207 in_pdf = pathlib.Path(options.in_pdf)
208 out_pdf = pathlib.Path(options.out_pdf)
209 cfg_path = pathlib.Path(options.cfg_path)
210 enforce = options.enforce
211 log.info(f'Patching pdf meta data of {in_pdf}')
212 log.info(f'Configuration path is {cfg_path}')
213 log.info(f'Output path is {out_pdf}')
214 if enforce: 214 ↛ 215line 214 didn't jump to line 215, because the condition on line 214 was never true
215 log.warning(f'Labeling will be enforced by overwriting {in_pdf}')
216 else:
217 log.info(f'Labeling will NOT be enforced - {in_pdf} will not be modified')
219 start_time = dti.datetime.now(tz=dti.timezone.utc)
220 start_ts = start_time.strftime(TS_FORMAT_PAYLOADS)
221 log.info(f'Start timestamp ({start_ts})')
223 log.info(LOG_SEPARATOR)
224 context = load_label_context(cfg_path)
225 log.info('loaded label context:')
226 for k, v in context['label'].items():
227 log.info(f'- {k :17s} -> {v}')
228 log.info(LOG_SEPARATOR)
230 if options.kv_pairs: 230 ↛ 231line 230 didn't jump to line 231, because the condition on line 230 was never true
231 log.info('key-value pairs from request for context kv_pairs:')
232 context['kv_pairs'] = {}
233 for k, v in options.kv_pairs.items(): 233 ↛ 234line 233 didn't jump to line 234, because the loop on line 233 never started
234 context['kv_pairs'][k] = v
235 log.info(f'- {k :17s} -> {v}')
236 log.info(LOG_SEPARATOR)
238 conventions = load_conventions(context, in_pdf)
239 log.info('identified conventions:')
240 for k, v in conventions.items():
241 log.info(f'- {k :17s} -> {v}')
242 log.info(LOG_SEPARATOR)
244 log.info(LOG_SEPARATOR)
245 log.info('PDF information from source file:')
246 log.info(LOG_SEPARATOR)
247 pdf_info(in_pdf)
248 log.info('PDF attributes/labels from source file:')
249 log.info(LOG_SEPARATOR)
250 pdf_attributes(in_pdf)
251 cross_correlate(source=in_pdf, conventions=conventions, context=context, target=out_pdf)
252 log.info('PDF attributes/labels from target file:')
253 log.info(LOG_SEPARATOR)
254 pdf_attributes(out_pdf)
255 log.info('PDF information from target file:')
256 log.info(LOG_SEPARATOR)
257 pdf_info(out_pdf)
259 if enforce: 259 ↛ 260line 259 didn't jump to line 260, because the condition on line 259 was never true
260 log.warning(f'Enforcing labels by overwriting {in_pdf}')
261 shutil.copy2(out_pdf, in_pdf)
262 log.info(f'- {out_pdf} -> {in_pdf}')
264 end_time = dti.datetime.now(tz=dti.timezone.utc)
265 end_ts = end_time.strftime(TS_FORMAT_PAYLOADS)
266 duration_secs = (end_time - start_time).total_seconds()
267 log.info(f'End timestamp ({end_ts})')
268 log.info(f'Patched {in_pdf} document and wrote {out_pdf} in {duration_secs} secs')
270 return 0