mirror of
https://expo.survex.com/repositories/troggle/.git
synced 2025-12-18 21:47:21 +00:00
refactoring
This commit is contained in:
@@ -7,6 +7,9 @@ from django.test import TestCase
|
|||||||
import settings
|
import settings
|
||||||
from troggle.parsers import drawings
|
from troggle.parsers import drawings
|
||||||
from troggle.core.models.survex import DrawingFile
|
from troggle.core.models.survex import DrawingFile
|
||||||
|
from troggle.core.models.wallets import Wallet
|
||||||
|
from troggle.core.models.survex import SingleScan
|
||||||
|
from troggle.core.models.troggle import DataIssue
|
||||||
|
|
||||||
|
|
||||||
class DrawingsPathlibTests(TestCase):
|
class DrawingsPathlibTests(TestCase):
|
||||||
@@ -83,3 +86,43 @@ class DrawingsPathlibTests(TestCase):
|
|||||||
drawings.load_drawings_files()
|
drawings.load_drawings_files()
|
||||||
|
|
||||||
self.assertEqual(DrawingFile.objects.count(), count)
|
self.assertEqual(DrawingFile.objects.count(), count)
|
||||||
|
|
||||||
|
def test_parse_tunnel_links_wallet_and_scan(self):
|
||||||
|
# Create a wallet and a singlescan, then ensure parse_tnl_file links them
|
||||||
|
w = Wallet.objects.create(fpath='x', walletname='2025#20')
|
||||||
|
ss = SingleScan.objects.create(ffile='x', name='notes.jpg', wallet=w)
|
||||||
|
df = DrawingFile.objects.create(dwgpath='tst.th', dwgname='tst')
|
||||||
|
|
||||||
|
drawings.parse_tnl_file(df, '2025#20/notes.jpg')
|
||||||
|
|
||||||
|
self.assertIn(w, df.dwgwallets.all())
|
||||||
|
self.assertIn(ss, df.scans.all())
|
||||||
|
|
||||||
|
def test_findwalletimage_logs_missing_scan(self):
|
||||||
|
# Wallet exists but no scan inside. Should create a DataIssue
|
||||||
|
w = Wallet.objects.create(fpath='x', walletname='2026#01')
|
||||||
|
df = DrawingFile.objects.create(dwgpath='tst2.th2', dwgname='tst2')
|
||||||
|
|
||||||
|
drawings.findwalletimage(df, '2026#01/missing.jpg')
|
||||||
|
|
||||||
|
di = DataIssue.objects.filter(parser='Therion', message__contains='not actually found')
|
||||||
|
self.assertTrue(di.exists())
|
||||||
|
|
||||||
|
def test_drawing_reference_multiple_creates_dataissue(self):
|
||||||
|
df1 = DrawingFile.objects.create(dwgpath='ref1', dwgname='shared')
|
||||||
|
df2 = DrawingFile.objects.create(dwgpath='ref2', dwgname='shared')
|
||||||
|
dfmain = DrawingFile.objects.create(dwgpath='main', dwgname='main')
|
||||||
|
|
||||||
|
drawings.parse_tnl_file(dfmain, 'shared')
|
||||||
|
|
||||||
|
di = DataIssue.objects.filter(parser='Tunnel', message__contains="files named 'shared'")
|
||||||
|
self.assertTrue(di.exists())
|
||||||
|
|
||||||
|
def test_drawing_reference_single_no_dataissue(self):
|
||||||
|
DrawingFile.objects.create(dwgpath='ref3', dwgname='unique')
|
||||||
|
dfmain = DrawingFile.objects.create(dwgpath='main2', dwgname='main2')
|
||||||
|
|
||||||
|
drawings.parse_tnl_file(dfmain, 'unique')
|
||||||
|
|
||||||
|
di = DataIssue.objects.filter(parser='Tunnel', message__contains="files named 'unique'")
|
||||||
|
self.assertFalse(di.exists())
|
||||||
|
|||||||
@@ -14,10 +14,6 @@ for tunnel and therion files
|
|||||||
todo = """
|
todo = """
|
||||||
- Rename functions more consistently between tunnel and therion variants
|
- Rename functions more consistently between tunnel and therion variants
|
||||||
|
|
||||||
- Refactor to use pathlib instead of whacky resetting of loop variable inside loop
|
|
||||||
to scan sub-folders. This will definitely break at some point..
|
|
||||||
|
|
||||||
- Recode rx_valid_ext to use pathlib suffix() function
|
|
||||||
|
|
||||||
- implement: findimportinsert(therionfile, imp)
|
- implement: findimportinsert(therionfile, imp)
|
||||||
Tries to link the scrap (Therion format) to the referenced therion scrap
|
Tries to link the scrap (Therion format) to the referenced therion scrap
|
||||||
@@ -115,17 +111,30 @@ def parse_tnl_file(dwgfile, path):
|
|||||||
This is used to tie drawings to the wallet, and thus the original survey data. Tunnel files
|
This is used to tie drawings to the wallet, and thus the original survey data. Tunnel files
|
||||||
contain a centreline which is an embedded survex file.
|
contain a centreline which is an embedded survex file.
|
||||||
"""
|
"""
|
||||||
|
# Delegate to the unified reference processor for consistent behaviour
|
||||||
|
_process_reference(dwgfile, path, parser_label="Tunnel")
|
||||||
|
|
||||||
|
|
||||||
|
def _process_reference(dwgfile, path, parser_label="Tunnel"):
|
||||||
|
"""Unified processor to link drawing files to wallets/scans or referenced drawings.
|
||||||
|
|
||||||
|
- If `path` matches a wallet pattern (rx_wallet), link the wallet and try to find the scan file in the wallet.
|
||||||
|
- If `path` looks like an image, do nothing (images are not treated as references here).
|
||||||
|
- Otherwise, treat `path` as a possible reference to another drawing (by name) and link via `dwgcontains`.
|
||||||
|
"""
|
||||||
|
|
||||||
wallet, scansfile = None, None
|
wallet, scansfile = None, None
|
||||||
if mscansdir := rx_wallet.search(path): # walrus
|
if not path:
|
||||||
# print(f"{path} -- {mscansdir.group(1)=} -- {mscansdir.group(2)=}")
|
return None, None
|
||||||
|
|
||||||
|
if mscansdir := rx_wallet.search(path):
|
||||||
scanswalletl = Wallet.objects.filter(walletname=mscansdir.group(1))
|
scanswalletl = Wallet.objects.filter(walletname=mscansdir.group(1))
|
||||||
# This should be changed to properly detect if a list of folders is returned and do something sensible, not just pick the first. e.g. use the __in Django idiom
|
|
||||||
if len(scanswalletl):
|
if len(scanswalletl):
|
||||||
wallet = scanswalletl[0]
|
wallet = scanswalletl[0]
|
||||||
if len(scanswalletl) > 1:
|
if len(scanswalletl) > 1:
|
||||||
message = f"! More than one scan FOLDER matches filter query. [{scanswalletl[0]}]: {mscansdir.group(1)} {mscansdir.group(2)} {dwgfile.dwgpath} {path}"
|
message = f"! More than one scan FOLDER matches filter query. [{scanswalletl[0]}]: {mscansdir.group(1)} {mscansdir.group(2)} {dwgfile.dwgpath} {path}"
|
||||||
print(message)
|
print(message)
|
||||||
DataIssue.objects.create(parser="Tunnel", message=message)
|
DataIssue.objects.create(parser=parser_label, message=message)
|
||||||
|
|
||||||
if wallet:
|
if wallet:
|
||||||
scansfilel = wallet.singlescan_set.filter(name=mscansdir.group(2))
|
scansfilel = wallet.singlescan_set.filter(name=mscansdir.group(2))
|
||||||
@@ -134,31 +143,57 @@ def parse_tnl_file(dwgfile, path):
|
|||||||
plist = [sf.ffile for sf in scansfilel]
|
plist = [sf.ffile for sf in scansfilel]
|
||||||
message = f"! More than one image FILENAME matches filter query. [{scansfilel[0]}]: {mscansdir.group(1)} {mscansdir.group(2)} {dwgfile.dwgpath} {path} {plist}"
|
message = f"! More than one image FILENAME matches filter query. [{scansfilel[0]}]: {mscansdir.group(1)} {mscansdir.group(2)} {dwgfile.dwgpath} {path} {plist}"
|
||||||
print(message)
|
print(message)
|
||||||
DataIssue.objects.create(parser="Tunnel", message=message)
|
DataIssue.objects.create(parser=parser_label, message=message)
|
||||||
scansfile = scansfilel[0]
|
scansfile = scansfilel[0]
|
||||||
|
|
||||||
if wallet:
|
if wallet:
|
||||||
dwgfile.dwgwallets.add(wallet)
|
dwgfile.dwgwallets.add(wallet)
|
||||||
if scansfile:
|
if scansfile:
|
||||||
dwgfile.scans.add(scansfile)
|
dwgfile.scans.add(scansfile)
|
||||||
|
return wallet, scansfile
|
||||||
|
|
||||||
elif path:
|
# Not a wallet reference; check image extension and possibly drawing-to-drawing reference
|
||||||
suffix = Path(path).suffix.lower()
|
suffix = Path(path).suffix.lower()
|
||||||
if suffix in IMAGE_EXTS:
|
if suffix in IMAGE_EXTS:
|
||||||
# It's an image/scanned file type; we don't treat it as a referenced drawing
|
# It's an image/scanned file type; we don't treat it as a referenced drawing
|
||||||
return
|
return
|
||||||
# Not an image file: perhaps a reference to another drawing (no ext or other ext)
|
|
||||||
name = Path(path).name
|
# Not an image file: perhaps a reference to another drawing (no ext or other ext)
|
||||||
rdwgfilel = DrawingFile.objects.filter(dwgname=name) # Check if it is another drawing file we have already seen
|
name = Path(path).name
|
||||||
if len(rdwgfilel):
|
rdwgfilel = DrawingFile.objects.filter(dwgname=name) # Check if it is another drawing file we have already seen
|
||||||
if len(rdwgfilel) > 1:
|
if len(rdwgfilel):
|
||||||
plist = []
|
if len(rdwgfilel) > 1:
|
||||||
for df in rdwgfilel:
|
plist = [df.dwgpath for df in rdwgfilel]
|
||||||
plist.append(df.dwgpath)
|
message = f"- Warning {len(rdwgfilel)} files named '{name}' {plist}"
|
||||||
message = f"- Warning {len(rdwgfilel)} files named '{name}' {plist}" # should not be a problem?
|
print(message)
|
||||||
print(message)
|
DataIssue.objects.create(parser=parser_label, message=message, url=f"/dwgdataraw/{path}")
|
||||||
DataIssue.objects.create(parser="Tunnel", message=message, url=f"/dwgdataraw/{path}")
|
rdwgfile = rdwgfilel[0]
|
||||||
rdwgfile = rdwgfilel[0]
|
if hasattr(dwgfile, 'dwgcontains'):
|
||||||
|
dwgfile.dwgcontains.add(rdwgfile)
|
||||||
|
|
||||||
|
dwgfile.save()
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
|
||||||
|
# Not a wallet reference; check image extension and possibly drawing-to-drawing reference
|
||||||
|
suffix = Path(path).suffix.lower()
|
||||||
|
if suffix in IMAGE_EXTS:
|
||||||
|
# It's an image/scanned file type; we don't treat it as a referenced drawing
|
||||||
|
return
|
||||||
|
|
||||||
|
# Not an image file: perhaps a reference to another drawing (no ext or other ext)
|
||||||
|
name = Path(path).name
|
||||||
|
rdwgfilel = DrawingFile.objects.filter(dwgname=name) # Check if it is another drawing file we have already seen
|
||||||
|
if len(rdwgfilel):
|
||||||
|
if len(rdwgfilel) > 1:
|
||||||
|
plist = []
|
||||||
|
for df in rdwgfilel:
|
||||||
|
plist.append(df.dwgpath)
|
||||||
|
message = f"- Warning {len(rdwgfilel)} files named '{name}' {plist}" # should not be a problem?
|
||||||
|
print(message)
|
||||||
|
DataIssue.objects.create(parser="Tunnel", message=message, url=f"/dwgdataraw/{path}")
|
||||||
|
rdwgfile = rdwgfilel[0]
|
||||||
|
if hasattr(dwgfile, 'dwgcontains'):
|
||||||
dwgfile.dwgcontains.add(rdwgfile)
|
dwgfile.dwgcontains.add(rdwgfile)
|
||||||
|
|
||||||
dwgfile.save()
|
dwgfile.save()
|
||||||
@@ -166,38 +201,16 @@ def parse_tnl_file(dwgfile, path):
|
|||||||
|
|
||||||
def findwalletimage(therionfile, foundpath):
|
def findwalletimage(therionfile, foundpath):
|
||||||
"""Tries to link the drawing file (Therion format) to the referenced image (scan) file"""
|
"""Tries to link the drawing file (Therion format) to the referenced image (scan) file"""
|
||||||
wallet, scansfile = None, None
|
# Delegate to the unified reference processor for consistent behaviour
|
||||||
foundpath = foundpath.strip("{}")
|
foundpath = foundpath.strip("{}")
|
||||||
mscansdir = rx_wallet.search(foundpath)
|
wallet, scansfile = _process_reference(therionfile, foundpath, parser_label="Therion")
|
||||||
if mscansdir:
|
|
||||||
scanswalletl = Wallet.objects.filter(walletname=mscansdir.group(1))
|
|
||||||
# This should be changed to properly detect if a list of folders is returned and do something sensible, not just pick the first. Use the __in idom
|
|
||||||
if len(scanswalletl):
|
|
||||||
wallet = scanswalletl[0]
|
|
||||||
if len(scanswalletl) > 1:
|
|
||||||
message = f"! More than one scan FOLDER matches filter query. [{therionfile}]: {mscansdir.group(1)} {foundpath}"
|
|
||||||
print(message)
|
|
||||||
DataIssue.objects.create(parser="Therion", message=message)
|
|
||||||
if wallet:
|
|
||||||
therionfile.dwgwallets.add(wallet)
|
|
||||||
|
|
||||||
scanfilename = Path(foundpath).name
|
# If a wallet was found but no scan was associated from the wallet, record a DataIssue
|
||||||
scansfilel = wallet.singlescan_set.filter(name=scanfilename, wallet=wallet)
|
if wallet and not scansfile:
|
||||||
if len(scansfilel):
|
scanfilename = Path(foundpath).name
|
||||||
# message = f'! {len(scansfilel)} {scansfilel} = {scanfilename} found in the wallet specified {wallet.walletname}'
|
message = f'! In {wallet.walletname} scanned file is not actually found {scanfilename} mentioned in "{therionfile.dwgpath}"'
|
||||||
# print(message)
|
wurl = f"/survey_scans/{wallet.walletname}/".replace("#", ":")
|
||||||
if len(scansfilel) > 1:
|
DataIssue.objects.create(parser="Therion", message=message, url=wurl)
|
||||||
plist = [sf.ffile for sf in scansfilel]
|
|
||||||
message = f"! More than one image FILENAME matches filter query. [{scansfilel[0]}]: {mscansdir.group(1)} {foundpath} {plist}"
|
|
||||||
print(message)
|
|
||||||
DataIssue.objects.create(parser="Therion", message=message)
|
|
||||||
scansfile = scansfilel[0]
|
|
||||||
therionfile.scans.add(scansfile)
|
|
||||||
else:
|
|
||||||
message = f'! In {wallet.walletname} scanned file is not actually found {scanfilename} mentioned in "{therionfile.dwgpath}"'
|
|
||||||
wurl = f"/survey_scans/{wallet.walletname}/".replace("#", ":")
|
|
||||||
# print(message)
|
|
||||||
DataIssue.objects.create(parser="Therion", message=message, url=wurl)
|
|
||||||
|
|
||||||
|
|
||||||
def findimportinsert(therionfile, imp):
|
def findimportinsert(therionfile, imp):
|
||||||
|
|||||||
Reference in New Issue
Block a user