qgis-tem-loader

qgis plugin for loading TEM geophysical inversion XYZ files as 3D objects
git clone git://src.adamsgaard.dk/qgis-tem-loader # fast
git clone https://src.adamsgaard.dk/qgis-tem-loader.git # slow
Log | Files | Refs | README | LICENSE Back to index

core.py (8191B)


      1 import csv
      2 import math
      3 from pathlib import Path
      4 import re
      5 
      6 
      7 TEMIMAGE_REQUIRED_COLUMNS = {'X', 'Y', 'Z', 'DOI', 'DataResidual', 'NumLayers'}
      8 AARHUS_REQUIRED_COLUMNS = {
      9     'LINE_NO',
     10     'X',
     11     'Y',
     12     'ELEVATION',
     13     'RECORD',
     14     'RESDATA',
     15     'RHO_1',
     16     'THK_1',
     17     'DOI_STANDARD',
     18 }
     19 
     20 EPSG_PATTERN = re.compile(r"\bepsg\s*:\s*(\d+)\b", re.IGNORECASE)
     21 HEADER_ALIASES = {
     22     'UTMX': 'X',
     23     'UTMY': 'Y',
     24 }
     25 
     26 
     27 def normalize_header_tokens(line):
     28     normalized = []
     29     for token in line.split():
     30         token = token.lstrip('/')
     31         token = HEADER_ALIASES.get(token, token)
     32         if token:
     33             normalized.append(token)
     34     return normalized
     35 
     36 
     37 def detect_format(headers):
     38     header_set = set(headers)
     39     if TEMIMAGE_REQUIRED_COLUMNS.issubset(header_set):
     40         return 'temimage'
     41     if AARHUS_REQUIRED_COLUMNS.issubset(header_set):
     42         return 'aarhus'
     43     return None
     44 
     45 
     46 def is_header_line(line):
     47     return detect_format(normalize_header_tokens(line)) is not None
     48 
     49 
     50 def detect_source_epsg(path, comment_char='/'):
     51     with open(path, 'r') as f:
     52         for line in f:
     53             stripped = line.lstrip()
     54             if is_header_line(stripped):
     55                 break
     56             if not stripped.startswith(comment_char):
     57                 break
     58             match = EPSG_PATTERN.search(stripped)
     59             if match is not None:
     60                 return f'EPSG:{match.group(1)}'
     61     return None
     62 
     63 
     64 def get_numbered_columns(headers, prefix):
     65     numbered = []
     66     for name in headers:
     67         if not name.startswith(prefix):
     68             continue
     69         suffix = name[len(prefix):]
     70         if suffix.isdigit():
     71             numbered.append((int(suffix), name))
     72     return [name for _, name in sorted(numbered)]
     73 
     74 
     75 def parse_num_layers(value):
     76     try:
     77         parsed = float(value)
     78     except (ValueError, TypeError):
     79         return None
     80     if math.isnan(parsed):
     81         return None
     82     return int(parsed)
     83 
     84 
     85 def count_valid_layers(row, res_cols, thick_cols):
     86     count = 0
     87     for res_col, thick_col in zip(res_cols, thick_cols):
     88         try:
     89             res = float(row.get(res_col, ''))
     90             thick = float(row.get(thick_col, ''))
     91         except (ValueError, TypeError):
     92             break
     93         if math.isnan(res) or math.isnan(thick):
     94             break
     95         count += 1
     96     return count
     97 
     98 
     99 def process_xyz(path):
    100     points = []
    101     doi_points = []
    102     layers = []
    103     headers = None
    104 
    105     with open(path, 'r') as f:
    106         for line_number, raw_line in enumerate(f, start=1):
    107             stripped = raw_line.strip()
    108             left_stripped = raw_line.lstrip()
    109 
    110             if headers is None:
    111                 if is_header_line(left_stripped):
    112                     headers = normalize_header_tokens(left_stripped)
    113                     source_format = detect_format(headers)
    114                     if source_format == 'temimage':
    115                         res_cols = get_numbered_columns(headers, 'Res_')
    116                         thick_cols = get_numbered_columns(headers, 'Thick_')
    117                     else:
    118                         res_cols = get_numbered_columns(headers, 'RHO_')
    119                         thick_cols = get_numbered_columns(headers, 'THK_')
    120                     dep_top_cols = get_numbered_columns(headers, 'DEP_TOP_')
    121                     dep_bot_cols = get_numbered_columns(headers, 'DEP_BOT_')
    122                     continue
    123                 if not left_stripped.startswith('/'):
    124                     raise ValueError('XYZ file does not contain a supported header row')
    125                 continue
    126 
    127             if not stripped:
    128                 continue
    129 
    130             values = stripped.split()
    131             # When the data has one more column than the header, pandas would
    132             # treat the first column as the row index. Replicate that here.
    133             if len(values) == len(headers) + 1:
    134                 values = values[1:]
    135             if len(values) != len(headers):
    136                 raise ValueError(
    137                     f'Row {line_number} has {len(values)} columns, '
    138                     f'expected {len(headers)}'
    139                 )
    140 
    141             row = dict(zip(headers, values))
    142             x = float(row['X'])
    143             y = float(row['Y'])
    144 
    145             if source_format == 'temimage':
    146                 z = float(row['Z'])
    147                 doi = float(row['DOI'])
    148                 data_residual = float(row['DataResidual'])
    149                 n_layers = parse_num_layers(row['NumLayers'])
    150                 line = row['Line']
    151                 station_no = row['StationNo']
    152             else:
    153                 z = float(row['ELEVATION'])
    154                 doi = float(row['DOI_STANDARD'])
    155                 data_residual = float(row['RESDATA'])
    156                 line = str(int(float(row['LINE_NO'])))
    157                 record = int(float(row['RECORD']))
    158                 station_no = f'{line}_{record:05d}'
    159                 n_layers = count_valid_layers(row, res_cols, thick_cols)
    160 
    161             z_doi = z - doi
    162             point_wkt = f'POINT Z ({x} {y} {z})'
    163             doi_wkt = f'POINT Z ({x} {y} {z_doi})'
    164 
    165             points.append({
    166                 'X': x,
    167                 'Y': y,
    168                 'Z': z,
    169                 'Line': line,
    170                 'StationNo': station_no,
    171                 'DataResidual': data_residual,
    172                 'NumLayers': n_layers,
    173                 'Geometry': point_wkt,
    174             })
    175             doi_points.append({
    176                 'X': x,
    177                 'Y': y,
    178                 'Z': z_doi,
    179                 'DOI': doi,
    180                 'ZDOI': z_doi,
    181                 'Geometry': doi_wkt,
    182             })
    183 
    184             max_layers = min(len(res_cols), len(thick_cols))
    185             if n_layers is not None:
    186                 max_layers = min(max_layers, n_layers)
    187 
    188             cum_depth = 0.0
    189             for i in range(max_layers):
    190                 res_col = res_cols[i]
    191                 thick_col = thick_cols[i]
    192                 res_val = row.get(res_col, '')
    193                 thick_val = row.get(thick_col, '')
    194                 try:
    195                     res = float(res_val)
    196                     thick = float(thick_val)
    197                 except (ValueError, TypeError):
    198                     break
    199                 if math.isnan(res) or math.isnan(thick):
    200                     break
    201 
    202                 dep_top_col = dep_top_cols[i] if i < len(dep_top_cols) else None
    203                 dep_bot_col = dep_bot_cols[i] if i < len(dep_bot_cols) else None
    204                 if dep_top_col and dep_bot_col:
    205                     try:
    206                         depth_top = float(row[dep_top_col])
    207                         depth_bottom = float(row[dep_bot_col])
    208                         if math.isnan(depth_top) or math.isnan(depth_bottom):
    209                             raise ValueError
    210                     except (ValueError, TypeError):
    211                         depth_top = cum_depth
    212                         depth_bottom = cum_depth + thick
    213                 else:
    214                     depth_top = cum_depth
    215                     depth_bottom = cum_depth + thick
    216 
    217                 z_top = z - depth_top
    218                 z_bot = z - depth_bottom
    219                 z_mid = (z_top + z_bot) / 2
    220                 cum_depth = depth_bottom
    221 
    222                 layer_wkt = f'LINESTRING Z ({x} {y} {z_top}, {x} {y} {z_bot})'
    223                 layers.append({
    224                     'X': x,
    225                     'Y': y,
    226                     'Z': z,
    227                     'ZTop': z_top,
    228                     'ZMid': z_mid,
    229                     'ZBottom': z_bot,
    230                     'DepthTop': depth_top,
    231                     'DepthBottom': depth_bottom,
    232                     'Resistivity': res,
    233                     'Layer': i + 1,
    234                     'Geometry': layer_wkt,
    235                 })
    236 
    237     if headers is None:
    238         raise ValueError('XYZ file does not contain a supported header row')
    239 
    240     return points, doi_points, layers
    241 
    242 
    243 def write_csv(rows, base_path, suffix):
    244     out_path = Path(base_path).with_suffix(suffix)
    245     if not rows:
    246         out_path.write_text('')
    247         return out_path
    248     with open(out_path, 'w', newline='') as f:
    249         writer = csv.DictWriter(f, fieldnames=list(rows[0].keys()))
    250         writer.writeheader()
    251         writer.writerows(rows)
    252     return out_path