mirror of
https://expo.survex.com/repositories/troggle/.git
synced 2025-12-18 07:07:10 +00:00
refactor using Pathlib (AI written)
This commit is contained in:
60
core/TESTS/test_drawings.py
Normal file
60
core/TESTS/test_drawings.py
Normal file
@@ -0,0 +1,60 @@
|
||||
import os
|
||||
import pathlib
|
||||
import tempfile
|
||||
|
||||
from django.test import TestCase
|
||||
|
||||
import settings
|
||||
from troggle.parsers import drawings
|
||||
from troggle.core.models.survex import DrawingFile
|
||||
|
||||
|
||||
class DrawingsPathlibTests(TestCase):
|
||||
def test_load_drawings_creates_expected_entries(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
# create a small tree
|
||||
p = pathlib.Path(td)
|
||||
(p / 'one.pdf').write_text('pdf')
|
||||
(p / 'two.txt').write_text('txt')
|
||||
sub = p / 'dir'
|
||||
sub.mkdir()
|
||||
(sub / 'three.png').write_text('png')
|
||||
sub2 = p / 'dir2'
|
||||
sub2.mkdir()
|
||||
(sub2 / 'abc.th2').write_text('th2')
|
||||
(sub2 / 'abc.th').write_text('th')
|
||||
|
||||
# point the module at our tempdir
|
||||
settings.DRAWINGS_DATA = td
|
||||
|
||||
drawings.load_drawings_files()
|
||||
|
||||
# all files should be present
|
||||
self.assertTrue(DrawingFile.objects.filter(dwgpath='one.pdf').exists())
|
||||
self.assertTrue(DrawingFile.objects.filter(dwgpath='two.txt').exists())
|
||||
self.assertTrue(DrawingFile.objects.filter(dwgpath='dir/three.png').exists())
|
||||
self.assertTrue(DrawingFile.objects.filter(dwgpath='dir2/abc.th2').exists())
|
||||
self.assertTrue(DrawingFile.objects.filter(dwgpath='dir2/abc.th').exists())
|
||||
|
||||
def test_hidden_and_backup_skipped(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
p = pathlib.Path(td)
|
||||
(p / '.hidden').write_text('hid')
|
||||
(p / 'file~').write_text('bak')
|
||||
settings.DRAWINGS_DATA = td
|
||||
|
||||
drawings.load_drawings_files()
|
||||
|
||||
# Should not import hidden or backup files
|
||||
self.assertFalse(DrawingFile.objects.filter(dwgpath='.hidden').exists())
|
||||
self.assertFalse(DrawingFile.objects.filter(dwgpath='file~').exists())
|
||||
|
||||
def test_no_extension_file(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
p = pathlib.Path(td)
|
||||
(p / 'noext').write_text('data')
|
||||
settings.DRAWINGS_DATA = td
|
||||
|
||||
drawings.load_drawings_files()
|
||||
|
||||
self.assertTrue(DrawingFile.objects.filter(dwgpath='noext').exists())
|
||||
@@ -1,6 +1,5 @@
|
||||
import os
|
||||
import re
|
||||
import stat
|
||||
from pathlib import Path
|
||||
|
||||
import settings
|
||||
@@ -35,7 +34,7 @@ rx_pcpath = re.compile(r'<pcarea area_signal="frame".*?sfsketch="([^"]*)" sfstyl
|
||||
rx_pctext = re.compile(r'pctext.*?\*ref&space;([^&]*)')
|
||||
|
||||
|
||||
rx_valid_ext = re.compile(r"(?i)\.(?:png|jpg|pdf|jpeg|gif|txt|svg)$")
|
||||
IMAGE_EXTS = {".png", ".jpg", ".jpeg", ".pdf", ".gif", ".txt", ".svg"}
|
||||
rx_wallet = re.compile(r"""
|
||||
# r"(\d\d\d\d#X?\d+\w?|1995-96kh|92-94Surveybookkh|1991surveybook|smkhs)/(.*?(?:png|jpg|pdf|jpeg|gif|txt))$", path
|
||||
# This regex is designed to extract a specific directory prefix and a filename
|
||||
@@ -71,7 +70,8 @@ def _set_filesize_and_check(fullpath, model_obj, parser_label, url_prefix="/dwgd
|
||||
Returns True if file exists and has size > 0, False otherwise.
|
||||
"""
|
||||
try:
|
||||
size = Path(fullpath).stat().st_size
|
||||
fullpath = Path(fullpath)
|
||||
size = fullpath.stat().st_size
|
||||
except Exception as e:
|
||||
message = f"! Unable to stat file {fullpath}: {e}"
|
||||
print(message)
|
||||
@@ -92,11 +92,12 @@ def _set_filesize_and_check(fullpath, model_obj, parser_label, url_prefix="/dwgd
|
||||
def _read_text_file(fullpath):
|
||||
"""Read text file robustly, returning a str (falls back to binary decode)."""
|
||||
try:
|
||||
with open(fullpath, "r", encoding="utf-8", errors="replace") as fh:
|
||||
path = Path(fullpath)
|
||||
with path.open("r", encoding="utf-8", errors="replace") as fh:
|
||||
return fh.read()
|
||||
except Exception:
|
||||
try:
|
||||
with open(fullpath, "rb") as fh:
|
||||
with path.open("rb") as fh:
|
||||
return fh.read().decode("utf-8", errors="replace")
|
||||
except Exception as e:
|
||||
print(f"! Unable to read file {fullpath}: {e}")
|
||||
@@ -122,7 +123,7 @@ def parse_tnl_file(dwgfile, path):
|
||||
if len(scanswalletl):
|
||||
wallet = scanswalletl[0]
|
||||
if len(scanswalletl) > 1:
|
||||
message = f"! More than one scan FOLDER matches filter query. [{scansfilel[0]}]: {mscansdir.group(1)} {mscansdir.group(2)} {dwgfile.dwgpath} {path}"
|
||||
message = f"! More than one scan FOLDER matches filter query. [{scanswalletl[0]}]: {mscansdir.group(1)} {mscansdir.group(2)} {dwgfile.dwgpath} {path}"
|
||||
print(message)
|
||||
DataIssue.objects.create(parser="Tunnel", message=message)
|
||||
|
||||
@@ -130,9 +131,7 @@ def parse_tnl_file(dwgfile, path):
|
||||
scansfilel = wallet.singlescan_set.filter(name=mscansdir.group(2))
|
||||
if len(scansfilel):
|
||||
if len(scansfilel) > 1:
|
||||
plist = []
|
||||
for sf in scansfilel:
|
||||
plist.append(sf.ffile)
|
||||
plist = [sf.ffile for sf in scansfilel]
|
||||
message = f"! More than one image FILENAME matches filter query. [{scansfilel[0]}]: {mscansdir.group(1)} {mscansdir.group(2)} {dwgfile.dwgpath} {path} {plist}"
|
||||
print(message)
|
||||
DataIssue.objects.create(parser="Tunnel", message=message)
|
||||
@@ -143,10 +142,13 @@ def parse_tnl_file(dwgfile, path):
|
||||
if scansfile:
|
||||
dwgfile.scans.add(scansfile)
|
||||
|
||||
elif path and not rx_valid_ext.search(
|
||||
path
|
||||
): # ie not recognised as a path where wallets live and not an image file type
|
||||
name = os.path.split(path)[1]
|
||||
elif path:
|
||||
suffix = Path(path).suffix.lower()
|
||||
if suffix in IMAGE_EXTS:
|
||||
# It's an image/scanned file type; we don't treat it as a referenced drawing
|
||||
return
|
||||
# Not an image file: perhaps a reference to another drawing (no ext or other ext)
|
||||
name = Path(path).name
|
||||
rdwgfilel = DrawingFile.objects.filter(dwgname=name) # Check if it is another drawing file we have already seen
|
||||
if len(rdwgfilel):
|
||||
if len(rdwgfilel) > 1:
|
||||
@@ -173,9 +175,7 @@ def findwalletimage(therionfile, foundpath):
|
||||
if len(scanswalletl):
|
||||
wallet = scanswalletl[0]
|
||||
if len(scanswalletl) > 1:
|
||||
message = "! More than one scan FOLDER matches filter query. [{}]: {} {} {}".format(
|
||||
therionfile, mscansdir.group(1), foundpath
|
||||
)
|
||||
message = f"! More than one scan FOLDER matches filter query. [{therionfile}]: {mscansdir.group(1)} {foundpath}"
|
||||
print(message)
|
||||
DataIssue.objects.create(parser="Therion", message=message)
|
||||
if wallet:
|
||||
@@ -187,10 +187,8 @@ def findwalletimage(therionfile, foundpath):
|
||||
# message = f'! {len(scansfilel)} {scansfilel} = {scanfilename} found in the wallet specified {wallet.walletname}'
|
||||
# print(message)
|
||||
if len(scansfilel) > 1:
|
||||
plist = []
|
||||
for sf in scansfilel:
|
||||
plist.append(sf.ffile)
|
||||
message = f"! More than one image FILENAME matches filter query. [{scansfilel[0]}]: {mscansdir.group(1)} {mscansdir.group(2)} {dwgfile.dwgpath} {path} {plist}"
|
||||
plist = [sf.ffile for sf in scansfilel]
|
||||
message = f"! More than one image FILENAME matches filter query. [{scansfilel[0]}]: {mscansdir.group(1)} {foundpath} {plist}"
|
||||
print(message)
|
||||
DataIssue.objects.create(parser="Therion", message=message)
|
||||
scansfile = scansfilel[0]
|
||||
@@ -211,7 +209,7 @@ def settherionfileinfo(filetuple):
|
||||
"""Read in the drawing file contents and sets values on the dwgfile object"""
|
||||
thtype, therionfile = filetuple
|
||||
|
||||
ff = os.path.join(settings.DRAWINGS_DATA, therionfile.dwgpath)
|
||||
ff = Path(settings.DRAWINGS_DATA) / therionfile.dwgpath
|
||||
if not _set_filesize_and_check(ff, therionfile, "Therion"):
|
||||
return
|
||||
|
||||
@@ -272,7 +270,7 @@ def settnlfileinfo(dwgfile):
|
||||
|
||||
*ref wallet identifiers may be found in at least two different places in tunnel files.
|
||||
"""
|
||||
ff = os.path.join(settings.DRAWINGS_DATA, dwgfile.dwgpath)
|
||||
ff = Path(settings.DRAWINGS_DATA) / dwgfile.dwgpath
|
||||
if not _set_filesize_and_check(ff, dwgfile, "Tunnel"):
|
||||
return
|
||||
|
||||
@@ -297,8 +295,8 @@ def settnlfileinfo(dwgfile):
|
||||
if wallets:
|
||||
for w in wallets:
|
||||
dwgfile.dwgwallets.add(w)
|
||||
except:
|
||||
message = f" ! wallet not found referenced from {dwgfile} -- '{refs}' "
|
||||
except Exception as e:
|
||||
message = f" ! wallet not found referenced from {dwgfile} -- '{refs}' ({e}) "
|
||||
print(message)
|
||||
DataIssue.objects.create(parser="Tunnel", message=message, url=f"/dwgdataraw/{dwgfile}")
|
||||
|
||||
@@ -329,7 +327,7 @@ def load_drawings_files():
|
||||
but the upload form intentionally refuses to upload PNG and JPG (though it does allow SVG)
|
||||
"""
|
||||
all_xml = []
|
||||
drawdatadir = settings.DRAWINGS_DATA
|
||||
drawdatadir = Path(settings.DRAWINGS_DATA)
|
||||
DrawingFile.objects.all().delete()
|
||||
DataIssue.objects.filter(parser="drawings").delete()
|
||||
DataIssue.objects.filter(parser="Therion").delete()
|
||||
@@ -338,49 +336,45 @@ def load_drawings_files():
|
||||
if os.path.isfile("therionrefs.log"):
|
||||
os.remove("therionrefs.log")
|
||||
|
||||
drawingsdirs = [""]
|
||||
supported_extensions = {".txt", ".xml", ".th", ".th2", ".pdf", ".png", ".svg", ".jpg"} # set
|
||||
supported_extensions = {".txt", ".xml", ".th", ".th2", ".pdf", ".png", ".svg", ".jpg"}
|
||||
|
||||
while drawingsdirs:
|
||||
drawdir = drawingsdirs.pop()
|
||||
for f in os.listdir(os.path.join(drawdatadir, drawdir)):
|
||||
if f[0] == "." or f[-1] == "~":
|
||||
continue
|
||||
lf = os.path.join(drawdir, f)
|
||||
ff = os.path.join(drawdatadir, lf)
|
||||
if os.path.isdir(ff):
|
||||
drawingsdirs.append(
|
||||
lf
|
||||
) # lunatic! adding to list in middle of list while loop! Replace with pathlib functions.
|
||||
# Walk the tree with pathlib, skip hidden and backup files
|
||||
for p in drawdatadir.rglob('*'):
|
||||
if p.name.startswith('.') or p.name.endswith('~'):
|
||||
continue
|
||||
if p.is_dir():
|
||||
continue
|
||||
|
||||
suffix = p.suffix.lower()
|
||||
if suffix in supported_extensions or suffix == '':
|
||||
rel = p.relative_to(drawdatadir).as_posix()
|
||||
if suffix == '':
|
||||
dwgname = p.name
|
||||
ext = ''
|
||||
else:
|
||||
file_path = Path(f)
|
||||
suffix = file_path.suffix.lower()
|
||||
dwgname = p.stem
|
||||
ext = suffix[1:]
|
||||
|
||||
if suffix in supported_extensions:
|
||||
dwgfile = DrawingFile(dwgpath=lf, dwgname=file_path.stem)
|
||||
dwgfile.save()
|
||||
# Get the extension without the dot for the tuple.
|
||||
all_xml.append((suffix[1:], dwgfile))
|
||||
|
||||
elif suffix == "":
|
||||
# This handles the special case for files with no extension.
|
||||
dwgfile = DrawingFile(dwgpath=lf, dwgname=file_path.name)
|
||||
dwgfile.save()
|
||||
all_xml.append(("", dwgfile))
|
||||
dwgfile = DrawingFile(dwgpath=rel, dwgname=dwgname)
|
||||
dwgfile.save()
|
||||
all_xml.append((ext, dwgfile, p))
|
||||
|
||||
print(f" - {len(all_xml)} Drawings files found")
|
||||
|
||||
for d in all_xml:
|
||||
extension, filename = d
|
||||
if extension in {"pdf", "txt", "svg", "jpg", "png", ""}: # set
|
||||
# Process in a deterministic order; ensure .th2 are handled before .th
|
||||
ext_priority = {'th2': 0, 'th': 1}
|
||||
all_xml.sort(key=lambda t: ext_priority.get(t[0], 2))
|
||||
|
||||
for extension, filename, pathobj in all_xml:
|
||||
if extension in {"pdf", "txt", "svg", "jpg", "png", ""}:
|
||||
setdrwfileinfo(filename)
|
||||
if extension == "xml":
|
||||
elif extension == "xml":
|
||||
settnlfileinfo(filename)
|
||||
# important to import .th2 files before .th so that we can assign them when found in .th files
|
||||
if extension == "th2":
|
||||
settherionfileinfo(d)
|
||||
if extension == "th":
|
||||
settherionfileinfo(d)
|
||||
elif extension == "th2":
|
||||
settherionfileinfo(("th2", filename))
|
||||
elif extension == "th":
|
||||
settherionfileinfo(("th", filename))
|
||||
|
||||
print(f" - Drawings parsed")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user