2
0
mirror of https://expo.survex.com/repositories/troggle/.git synced 2025-12-17 17:47:13 +00:00
Files
troggle/parsers/drawings.py
2025-12-15 22:50:08 +00:00

512 lines
23 KiB
Python

import os
import re
from pathlib import Path
import time
import settings
from troggle.core.models.survex import DrawingFile
from troggle.core.models.troggle import DataIssue
from troggle.core.models.wallets import Wallet
"""Searches through all the :drawings: repository looking
for tunnel and therion files
"""
todo = """
- Fix lost links to 1999 wallets now they have been renamed
- implement: findimportinsert(therionfile, imp)
Tries to link the scrap (Therion format) to the referenced therion scrap
"""
rx_xth_me = re.compile(r"xth_me_image_insert.*{.*}$", re.MULTILINE)
rx_scrap = re.compile(r"^survey (\w*).*$", re.MULTILINE)
rx_input = re.compile(r"^input ", re.MULTILINE)
rx_line = re.compile(r"^line ", re.MULTILINE)
rx_ref = re.compile(r"^#?\s?ref\s*\:?\s*([^\s\t]*)", re.MULTILINE)
rx_skpath = re.compile(r"<skpath")
rx_pcpath = re.compile(r'<pcarea area_signal="frame".*?sfsketch="([^"]*)" sfstyle="([^"]*)"')
rx_pctext = re.compile(r'pctext.*?\*ref&space;([^&]*)')
# Supported suffixes are stored with a leading dot and lowercase (matches Path.suffix)
# We factor out the image-like extensions which are common to both sets so they
# are defined once and then reused to build the other sets.
IMAGE_LIKE_EXTS = {".png", ".jpg", ".jpeg", ".pdf", ".gif", ".txt", ".svg"}
# Extensions that we treat as "image-like" for the purposes of reference parsing
IMAGE_EXTS = set(IMAGE_LIKE_EXTS)
# Supported extensions include image-like ones plus drawing/text-specific ones
SUPPORTED_EXTENSIONS = IMAGE_LIKE_EXTS.union({".xml", ".th", ".th2"})
def _is_supported_suffix(suffix: str) -> bool:
"""Return True if `suffix` (e.g. '.png') is a supported drawings extension."""
if not suffix:
return False
return suffix.lower() in SUPPORTED_EXTENSIONS
def _is_image_suffix(suffix: str) -> bool:
"""Return True if `suffix` looks like an image/scan-type suffix."""
if not suffix:
return False
return suffix.lower() in IMAGE_EXTS
def fetch_drawingfiles_by_paths(paths, chunk_size: int = 500):
"""Fetch DrawingFile objects for the given iterable of paths in chunks.
This avoids building a very large SQL IN(...) clause which can exceed DB
parameter limits (SQLite defaults to 999 bound variables). A default
chunk_size of 500 is conservative and works well across backends.
Returns a dict mapping dwgpath -> DrawingFile (first match if duplicates).
"""
mapping = {}
if not paths:
return mapping
# Ensure we iterate over a list to allow slicing in chunks
rel_paths = list(paths)
for i in range(0, len(rel_paths), chunk_size):
chunk = rel_paths[i : i + chunk_size]
for obj in DrawingFile.objects.filter(dwgpath__in=chunk):
# if duplicates exist, preserve the first one seen
mapping.setdefault(obj.dwgpath, obj)
return mapping
rx_wallet = re.compile(r"""
# This regex is designed to extract a specific directory prefix (walletname) and a filename
# from the end of a path string.
# --- Group 1: Directory or Survey Prefix ---
# but current AND HISTORIC wallet namings, as images have been edited over the years
( # Start of Capture Group 1
\d{4}\#X?\d+\w? # Matches patterns like "2025#123", "2016#X04" or "1999#45a", NB # must be escaped in VERBOSE mode
| # OR
1995-96kh # Matches the literal string "1995-96kh"
|
1989LUSS # Matches the literal string "1989LUSS"
| # OR
1989Surveybook # Matches the literal string "1989Surveybook"
| # OR
1990NotKHsurveybook # Matches the literal string "1990NotKHsurveybook"
| # OR
199?kh #
| # OR
199?notkh #
| # OR
199?Surveybookkh # Matches the literal string "92-94Surveybookkh"
| # OR
1992-94NotKHSurveybook # Matches the literal string "92-94Surveybookkh"
| # OR
92-94Surveybookkh # Matches the literal string "92-94Surveybookkh"
| # OR
1991surveybook # Matches the literal string "1991surveybook"
| # OR
1991surveybook # Matches the literal string "1991surveybook"
| # OR
smkhs # This is now expofiles/surveys/smkhs/ not in surveyscans/ at all.
) # End of Capture Group 1
/ # A literal forward slash separating the parts
# --- Group 2: Filename ---
( # Start of Capture Group 2
.*? # Non-greedily match the filename stem (any character)
(?: # Start of a non-capturing group for the extension
png|jpg|pdf|jpeg|gif|txt|svg
) # End of the extension group
) # End of Capture Group 2
$ # Anchor, ensuring the match is at the end of the string
""", re.VERBOSE | re.IGNORECASE)
def _set_filesize_and_check(fullpath, model_obj, parser_label, url_prefix="/dwgdataraw"):
"""Set model_obj.filesize from filesystem and create DataIssue if missing/zero.
Returns True if file exists and has size > 0, False otherwise.
"""
try:
fullpath = Path(fullpath)
size = fullpath.stat().st_size
except Exception as e:
message = f"! Unable to stat file {fullpath}: {e}"
print(message)
DataIssue.objects.update_or_create(parser=parser_label, message=message, url=f"{url_prefix}/{getattr(model_obj, 'dwgpath', '')}")
return False
model_obj.filesize = size
# Do not save here; caller should include 'filesize' in the bulk update set.
if size <= 0:
message = f"! Zero length {parser_label.lower()} file {fullpath}"
print(message)
DataIssue.objects.update_or_create(parser=parser_label, message=message, url=f"{url_prefix}/{getattr(model_obj, 'dwgpath', '')}")
return False
return True
def _read_text_file(fullpath):
"""Read text file robustly, returning a str (falls back to binary decode)."""
try:
path = Path(fullpath)
with path.open("r", encoding="utf-8", errors="replace") as fh:
return fh.read()
except Exception:
try:
with path.open("rb") as fh:
return fh.read().decode("utf-8", errors="replace")
except Exception as e:
print(f"! Unable to read file {fullpath}: {e}")
return ""
def parse_tnl_file(dwgfile, path):
"""Is given a line of text 'path' which may or may not contain a recognisable name of a scanned file
which we have already seen when we imported all the files we could find in the surveyscans direstories.
The purpose is to find cross-references between Tunnel drawing files and wallets
AND to find the names of the scanfiles in that wallet - from reading the Tunnel file not from
interrogating the wallet.
Note that this means that the list of scanfiles will be as it was when the drawing was created, not as it is now. Perhaps
we should not actually do it this way ? Or at least, label the table heading.
This is used to tie drawings to the wallet, and thus the original survey data. Tunnel files
contain a centreline which is an embedded survex file.
"""
# Delegate to the unified reference processor for consistent behaviour
_process_reference(dwgfile, path, parser_label="Tunnel")
def _handle_obsolete_wallets(old_wallet, dwgfile, path, parser_label):
message = f"- Warning {old_wallet} not a currently valid wallet name. In {path}"
print(message)
DataIssue.objects.update_or_create(parser=parser_label, message=message, url=f"/dwgdataraw/{path}")
def _process_reference(dwgfile, path, parser_label="Tunnel"):
"""Unified processor to link drawing files to wallets/scans or referenced drawings.
- If `path` matches a wallet pattern (rx_wallet), link the wallet and try to find the scan file in the wallet.
- If `path` looks like an image, do nothing (images are not treated as references here - yet).
- Otherwise, treat `path` as a possible reference to another drawing (by name) and link via `dwgcontains`.
"""
wallet, scansfile = None, None
if not path:
return None, None
if mscansdir := rx_wallet.search(path):
scanswalletl = Wallet.objects.filter(walletname=mscansdir.group(1)) # wallet name
if len(scanswalletl):
wallet = scanswalletl[0]
if len(scanswalletl) > 1:
message = f"! More than one scan FOLDER matches filter query. [{scanswalletl[0]}]: {mscansdir.group(1)} {mscansdir.group(2)} {dwgfile.dwgpath} {path}"
print(message)
DataIssue.objects.update_or_create(parser=parser_label, message=message)
else: # found a wallet name, but it is not one we recognise as having been imported
_handle_obsolete_wallets(mscansdir.group(1),dwgfile, path, parser_label)
if wallet:
scansfilel = wallet.singlescan_set.filter(name=mscansdir.group(2)) # file name
if len(scansfilel):
if len(scansfilel) > 1:
plist = [sf.ffile for sf in scansfilel]
message = f"! More than one image FILENAME matches filter query. [{scansfilel[0]}]: {mscansdir.group(1)} {mscansdir.group(2)} {dwgfile.dwgpath} {path} {plist}"
print(message)
DataIssue.objects.update_or_create(parser=parser_label, message=message)
scansfile = scansfilel[0]
if wallet:
dwgfile.dwgwallets.add(wallet)
if scansfile:
dwgfile.scans.add(scansfile)
# If a wallet was found but no scan was associated from the wallet, record a DataIssue. There are a lot of these..
if wallet and not scansfile:
scanfilename = Path(path).name
message = f"! In '{wallet.walletname}' scanned file is not actually found '{scanfilename}' in '{path}'"
wurl = f"/survey_scans/{wallet.walletname}/".replace("#", ":")
DataIssue.objects.update_or_create(parser=parser_label, message=message, url=wurl)
return wallet, scansfile
# Not a wallet reference; check image extension and possibly drawing-to-drawing reference
suffix = Path(path).suffix.lower()
if _is_image_suffix(suffix):
# It's an image/scanned file type; we don't treat it as a referenced drawing, though in future we should note the link
return None, None
# Not an image file: perhaps a reference to another drawing (no ext or other ext)
name = Path(path).name
rdwgfilel = DrawingFile.objects.filter(dwgname=name) # Check if it is another drawing file we have already seen
if len(rdwgfilel):
if len(rdwgfilel) > 1:
plist = [df.dwgpath for df in rdwgfilel]
message = f"- Warning {len(rdwgfilel)} files named '{name}' {plist}"
print(message)
DataIssue.objects.update_or_create(parser=parser_label, message=message, url=f"/dwgdataraw/{path}")
rdwgfile = rdwgfilel[0]
if hasattr(dwgfile, 'dwgcontains'): # implement model change in models/survex.py to use this
dwgfile.dwgcontains.add(rdwgfile)
return None, None
def findimportinsert(therionfile, imp):
"""Tries to link the scrap (Therion format) to the referenced therion scrap"""
pass
def _assign_wallets_for_model(model_obj, wallet_names, parser_label="Tunnel"):
"""Assign wallets to `model_obj` by wallet name(s).
wallet_names may be a single string or an iterable of names. This function
will add any Wallets found via Wallet.objects.filter(walletname__in=...) to
model_obj.dwgwallets and return the list of matched Wallet objects. If none
are found, or an exception occurs, a DataIssue is recorded with parser
set to `parser_label`.
"""
if not wallet_names:
return []
# Normalize to list of names
if isinstance(wallet_names, (str, bytes)):
names = [str(wallet_names)]
else:
try:
names = [str(n) for n in wallet_names]
except Exception:
names = [str(wallet_names)]
try:
wallets = list(Wallet.objects.filter(walletname__in=names))
if wallets:
for w in wallets:
model_obj.dwgwallets.add(w)
return wallets
# Nothing found: record a DataIssue
message = f" ! wallet(s) '{names}' not found from {getattr(model_obj, 'dwgpath', model_obj)}"
print(message)
DataIssue.objects.update_or_create(parser=parser_label, message=message, url=f"/dwgdataraw/{getattr(model_obj, 'dwgpath', '')}")
return []
except Exception as e:
message = f" ! Exception while looking up wallet(s) '{names}' from {getattr(model_obj, 'dwgpath', model_obj)} -- ({e})"
print(message)
DataIssue.objects.update_or_create(parser=parser_label, message=message, url=f"/dwgdataraw/{getattr(model_obj, 'dwgpath', '')}")
return []
def settherionfileinfo(filetuple):
"""Read in the drawing file contents and sets values on the dwgfile object"""
thtype, therionfile = filetuple
ff = Path(settings.DRAWINGS_DATA) / therionfile.dwgpath
modified = set()
if _set_filesize_and_check(ff, therionfile, "Therion"):
modified.add("filesize")
ttext = _read_text_file(ff)
# The equivalent for a tunnel 'path' would be a .th2 'line wall' or 'scrap'
# print(len(re.findall(r"line", ttext)))
if thtype == "th":
therionfile.npaths = len(rx_input.findall(ttext))
modified.add("npaths")
if wallet_texts := rx_ref.findall(ttext):
# Delegate wallet assignment to helper; use parser_label 'Therion'
_assign_wallets_for_model(therionfile, wallet_texts, parser_label="Therion")
elif thtype == "th2":
therionfile.npaths = len(rx_line.findall(ttext))
modified.add("npaths")
# scan and look for survex blocks that might have been included, and image scans (as for tunnel drawings)
# which would populate dwgfile.survexfile
# in .th2 files:
# ##XTHERION## xth_me_image_insert {500 1 1.0} {1700 {}} ../../../expofiles/surveyscans/2014/01popped_elev1.jpeg 0 {}
# scrap blownout -projection plan -scale [-81.0 -42.0 216.0 -42.0 0.0 0.0 7.5438 0.0 m]
for xth_me in rx_xth_me.findall(ttext):
# WORK IN PROGRESS. Do not clutter up the DataIssues list with this
# Surely not needed for .th files ?? only .th2 ?
message = f"! Un-parsed image filename: {therionfile.dwgname} : {xth_me.split()[-3]} - {therionfile.dwgpath}"
# print(message)
# DataIssue.objects.update_or_create(parser='xTherion', message=message, url=f'/dwgdataraw/{therionfile.dwgpath}')
# ! Un-parsed image filename: 107coldest : ../../../expofiles/surveyscans/2015/2015#20/notes.jpg - therion/plan/107coldest.th2
with open("therionrefs.log", "a") as lg:
lg.write(message + "\n")
foundpath = xth_me.split()[-3].strip("{}")
_process_reference(therionfile, foundpath, parser_label="Therion")
for inp in rx_input.findall(ttext):
# if this 'input' is a .th2 file we have already seen, then we can assign this as a sub-file
# but we would need to disentangle to get the current path properly
message = f"! Un-set (?) Therion .th2 input: - {therionfile.dwgname} : {inp} - {therionfile.dwgpath}"
# print(message)
DataIssue.objects.update_or_create(parser="xTherion", message=message, url=f"/dwgdataraw/{therionfile.dwgpath}")
findimportinsert(therionfile, inp)
# Defer saving scalar fields; caller will perform bulk_update.
return modified
def settnlfileinfo(dwgfile):
"""Read in the drawing file contents and sets values on the dwgfile object
Should try to read the date too e.g. tunneldate="2010-08-16 22:51:57
then we could display on the master calendar per expo.
Tunnel files are unfortunately not fully compliant XML so we can't use any of the XML parsing tools
available. Thanks Julian.
*ref wallet identifiers may be found in at least two different places in tunnel files.
"""
ff = Path(settings.DRAWINGS_DATA) / dwgfile.dwgpath
modified = set()
if _set_filesize_and_check(ff, dwgfile, "Tunnel"):
modified.add("filesize")
ttext = _read_text_file(ff)
dwgfile.npaths = len(rx_skpath.findall(ttext))
modified.add("npaths")
# example drawing file in Tunnel format.
# <tunnelxml tunnelversion="version2009-06-21 Matienzo" tunnelproject="ireby" tunneluser="goatchurch" tunneldate="2009-06-29 23:22:17">
# <pcarea area_signal="frame" sfscaledown="12.282584" sfrotatedeg="-90.76982" sfxtrans="11.676667377221136" sfytrans="-15.677173422877454" sfsketch="204description/scans/plan(38).png" sfstyle="" nodeconnzsetrelative="0.0">
# sfsketch="surveyscans/2025/2025#41/plan_diddlypot.png"
for scanfile_path, style in rx_pcpath.findall(ttext):
parse_tnl_file(dwgfile, scanfile_path)
# <pathcodes>
# <pctext style="survey" nodeposxrel="-1.0" nodeposyrel="-1.0"> *file_begin "/home/expo/loser/caves-1623/2025-dw-01/trip1.svx" "trip1.svx" | *begin 1 | *export 1 25 | | ; Cave: 2025-dw-01 | ; Area in cave/QM: Entrance series | *title "2025-dw-01" | *date 2025.07.13 | *team "Dylan Wase" notes | *team "Daniel Gorst" dog | *instrument SAP "SAP6 Dylan" | *ref 2025#20 |
for refs in rx_pctext.findall(ttext):
if refs:
# Delegate wallet lookup/assignment to helper for consistent handling
_assign_wallets_for_model(dwgfile, refs, parser_label="Tunnel")
# should also scan and look for survex blocks that might have been included, and image scans
# which would populate dwgfile.survexfile
# Defer scalar saves to bulk_update; return set of modified fields
return modified
def setdrwfileinfo(dwgfile):
"""Read in the drawing file contents and sets values on the dwgfile object,
but these are SVGs, PDFs or .txt files, so there is no useful format to search for
"""
ff = Path(settings.DRAWINGS_DATA) / dwgfile.dwgpath
# Set filesize (if available) but do not save; return modified field names
if not _set_filesize_and_check(ff, dwgfile, "drawings"):
return set()
return {"filesize"}
def load_drawings_files():
"""
Why do we have all this detection of file types/! Why not use get_mime_types ?
What is it all for ??
We import JPG, PNG and SVG files; which have already been put on the server,
but the upload form intentionally refuses to upload PNG and JPG (though it does allow SVG)
"""
# Track elapsed time
start_time = time.perf_counter()
all_xml = []
drawdatadir = Path(settings.DRAWINGS_DATA)
DrawingFile.objects.all().delete()
DataIssue.objects.filter(parser="drawings").delete()
DataIssue.objects.filter(parser="Therion").delete()
DataIssue.objects.filter(parser="xTherion").delete()
DataIssue.objects.filter(parser="Tunnel").delete()
if os.path.isfile("therionrefs.log"):
os.remove("therionrefs.log")
# Walk the tree with pathlib, skip hidden and backup files
files_meta = [] # list of tuples (ext, rel_path, dwgname, pathobj)
for p in drawdatadir.rglob('*'):
# Ignore anything under a .git directory
if '.git' in p.parts:
continue
if p.name.startswith('.') or p.name.endswith('~'):
continue
if p.is_dir():
continue
suffix = p.suffix.lower()
if _is_supported_suffix(suffix) or suffix == '':
rel = p.relative_to(drawdatadir).as_posix()
if suffix == '':
dwgname = p.name
ext = ''
else:
dwgname = p.stem
ext = suffix[1:]
files_meta.append((ext, rel, dwgname, p))
# Bulk create DrawingFile instances to avoid many individual DB saves
if files_meta:
objs_to_create = [DrawingFile(dwgpath=rel, dwgname=dwgname) for (_, rel, dwgname, _) in files_meta]
# Use chunks to avoid huge single queries
chunk_size = 700
for i in range(0, len(objs_to_create), chunk_size):
DrawingFile.objects.bulk_create(objs_to_create[i : i + chunk_size])
# Re-fetch created objects and map by dwgpath using a chunked fetch helper
rel_paths = [rel for (_, rel, _, _) in files_meta]
mapping = fetch_drawingfiles_by_paths(rel_paths, chunk_size=500)
# Reconstruct all_xml using the created model instances
for ext, rel, _, p in files_meta:
dwgfile = mapping.get(rel)
if dwgfile:
all_xml.append((ext, dwgfile, p))
elapsed = time.perf_counter() - start_time
print(f" - {len(all_xml)} Drawings files found ({elapsed:.2f}s)")
# Process in a deterministic order; ensure .th2 are handled before .th
ext_priority = {'th2': 0, 'th': 1}
all_xml.sort(key=lambda t: ext_priority.get(t[0], 2))
# Process files and collect modified scalar fields for bulk update
modified_map = {} # {DrawingFile instance: set(fields)}
for extension, filename, pathobj in all_xml:
modified = set()
if extension in {"pdf", "txt", "svg", "jpg", "png", ""}:
modified = setdrwfileinfo(filename) or set()
elif extension == "xml":
modified = settnlfileinfo(filename) or set()
# important to import .th2 files before .th so that we can assign them when found in .th files
elif extension == "th2":
modified = settherionfileinfo(("th2", filename)) or set()
elif extension == "th":
modified = settherionfileinfo(("th", filename)) or set()
if modified:
modified_map.setdefault(filename, set()).update(modified)
elapsed = time.perf_counter() - start_time
print(f" - Drawings parsed ({elapsed:.2f}s)")
# Bulk update scalar fields grouped by identical field-sets to use bulk_update efficiently
from collections import defaultdict
groups = defaultdict(list) # {tuple(fields): [instances]}
for inst, fields in modified_map.items():
key = tuple(sorted(fields))
groups[key].append(inst)
for fields_tuple, instances in groups.items():
fields_list = list(fields_tuple)
# Use a conservative batch size
DrawingFile.objects.bulk_update(instances, fields_list, batch_size=500)
elapsed = time.perf_counter() - start_time
print(f" - Database updated ({elapsed:.2f}s)")
# for drawfile in DrawingFile.objects.all():
# SetTunnelfileInfo(drawfile)