core.py (8191B)
1 import csv 2 import math 3 from pathlib import Path 4 import re 5 6 7 TEMIMAGE_REQUIRED_COLUMNS = {'X', 'Y', 'Z', 'DOI', 'DataResidual', 'NumLayers'} 8 AARHUS_REQUIRED_COLUMNS = { 9 'LINE_NO', 10 'X', 11 'Y', 12 'ELEVATION', 13 'RECORD', 14 'RESDATA', 15 'RHO_1', 16 'THK_1', 17 'DOI_STANDARD', 18 } 19 20 EPSG_PATTERN = re.compile(r"\bepsg\s*:\s*(\d+)\b", re.IGNORECASE) 21 HEADER_ALIASES = { 22 'UTMX': 'X', 23 'UTMY': 'Y', 24 } 25 26 27 def normalize_header_tokens(line): 28 normalized = [] 29 for token in line.split(): 30 token = token.lstrip('/') 31 token = HEADER_ALIASES.get(token, token) 32 if token: 33 normalized.append(token) 34 return normalized 35 36 37 def detect_format(headers): 38 header_set = set(headers) 39 if TEMIMAGE_REQUIRED_COLUMNS.issubset(header_set): 40 return 'temimage' 41 if AARHUS_REQUIRED_COLUMNS.issubset(header_set): 42 return 'aarhus' 43 return None 44 45 46 def is_header_line(line): 47 return detect_format(normalize_header_tokens(line)) is not None 48 49 50 def detect_source_epsg(path, comment_char='/'): 51 with open(path, 'r') as f: 52 for line in f: 53 stripped = line.lstrip() 54 if is_header_line(stripped): 55 break 56 if not stripped.startswith(comment_char): 57 break 58 match = EPSG_PATTERN.search(stripped) 59 if match is not None: 60 return f'EPSG:{match.group(1)}' 61 return None 62 63 64 def get_numbered_columns(headers, prefix): 65 numbered = [] 66 for name in headers: 67 if not name.startswith(prefix): 68 continue 69 suffix = name[len(prefix):] 70 if suffix.isdigit(): 71 numbered.append((int(suffix), name)) 72 return [name for _, name in sorted(numbered)] 73 74 75 def parse_num_layers(value): 76 try: 77 parsed = float(value) 78 except (ValueError, TypeError): 79 return None 80 if math.isnan(parsed): 81 return None 82 return int(parsed) 83 84 85 def count_valid_layers(row, res_cols, thick_cols): 86 count = 0 87 for res_col, thick_col in zip(res_cols, thick_cols): 88 try: 89 res = float(row.get(res_col, '')) 90 thick = float(row.get(thick_col, '')) 91 except (ValueError, TypeError): 92 break 93 if math.isnan(res) or math.isnan(thick): 94 break 95 count += 1 96 return count 97 98 99 def process_xyz(path): 100 points = [] 101 doi_points = [] 102 layers = [] 103 headers = None 104 105 with open(path, 'r') as f: 106 for line_number, raw_line in enumerate(f, start=1): 107 stripped = raw_line.strip() 108 left_stripped = raw_line.lstrip() 109 110 if headers is None: 111 if is_header_line(left_stripped): 112 headers = normalize_header_tokens(left_stripped) 113 source_format = detect_format(headers) 114 if source_format == 'temimage': 115 res_cols = get_numbered_columns(headers, 'Res_') 116 thick_cols = get_numbered_columns(headers, 'Thick_') 117 else: 118 res_cols = get_numbered_columns(headers, 'RHO_') 119 thick_cols = get_numbered_columns(headers, 'THK_') 120 dep_top_cols = get_numbered_columns(headers, 'DEP_TOP_') 121 dep_bot_cols = get_numbered_columns(headers, 'DEP_BOT_') 122 continue 123 if not left_stripped.startswith('/'): 124 raise ValueError('XYZ file does not contain a supported header row') 125 continue 126 127 if not stripped: 128 continue 129 130 values = stripped.split() 131 # When the data has one more column than the header, pandas would 132 # treat the first column as the row index. Replicate that here. 133 if len(values) == len(headers) + 1: 134 values = values[1:] 135 if len(values) != len(headers): 136 raise ValueError( 137 f'Row {line_number} has {len(values)} columns, ' 138 f'expected {len(headers)}' 139 ) 140 141 row = dict(zip(headers, values)) 142 x = float(row['X']) 143 y = float(row['Y']) 144 145 if source_format == 'temimage': 146 z = float(row['Z']) 147 doi = float(row['DOI']) 148 data_residual = float(row['DataResidual']) 149 n_layers = parse_num_layers(row['NumLayers']) 150 line = row['Line'] 151 station_no = row['StationNo'] 152 else: 153 z = float(row['ELEVATION']) 154 doi = float(row['DOI_STANDARD']) 155 data_residual = float(row['RESDATA']) 156 line = str(int(float(row['LINE_NO']))) 157 record = int(float(row['RECORD'])) 158 station_no = f'{line}_{record:05d}' 159 n_layers = count_valid_layers(row, res_cols, thick_cols) 160 161 z_doi = z - doi 162 point_wkt = f'POINT Z ({x} {y} {z})' 163 doi_wkt = f'POINT Z ({x} {y} {z_doi})' 164 165 points.append({ 166 'X': x, 167 'Y': y, 168 'Z': z, 169 'Line': line, 170 'StationNo': station_no, 171 'DataResidual': data_residual, 172 'NumLayers': n_layers, 173 'Geometry': point_wkt, 174 }) 175 doi_points.append({ 176 'X': x, 177 'Y': y, 178 'Z': z_doi, 179 'DOI': doi, 180 'ZDOI': z_doi, 181 'Geometry': doi_wkt, 182 }) 183 184 max_layers = min(len(res_cols), len(thick_cols)) 185 if n_layers is not None: 186 max_layers = min(max_layers, n_layers) 187 188 cum_depth = 0.0 189 for i in range(max_layers): 190 res_col = res_cols[i] 191 thick_col = thick_cols[i] 192 res_val = row.get(res_col, '') 193 thick_val = row.get(thick_col, '') 194 try: 195 res = float(res_val) 196 thick = float(thick_val) 197 except (ValueError, TypeError): 198 break 199 if math.isnan(res) or math.isnan(thick): 200 break 201 202 dep_top_col = dep_top_cols[i] if i < len(dep_top_cols) else None 203 dep_bot_col = dep_bot_cols[i] if i < len(dep_bot_cols) else None 204 if dep_top_col and dep_bot_col: 205 try: 206 depth_top = float(row[dep_top_col]) 207 depth_bottom = float(row[dep_bot_col]) 208 if math.isnan(depth_top) or math.isnan(depth_bottom): 209 raise ValueError 210 except (ValueError, TypeError): 211 depth_top = cum_depth 212 depth_bottom = cum_depth + thick 213 else: 214 depth_top = cum_depth 215 depth_bottom = cum_depth + thick 216 217 z_top = z - depth_top 218 z_bot = z - depth_bottom 219 z_mid = (z_top + z_bot) / 2 220 cum_depth = depth_bottom 221 222 layer_wkt = f'LINESTRING Z ({x} {y} {z_top}, {x} {y} {z_bot})' 223 layers.append({ 224 'X': x, 225 'Y': y, 226 'Z': z, 227 'ZTop': z_top, 228 'ZMid': z_mid, 229 'ZBottom': z_bot, 230 'DepthTop': depth_top, 231 'DepthBottom': depth_bottom, 232 'Resistivity': res, 233 'Layer': i + 1, 234 'Geometry': layer_wkt, 235 }) 236 237 if headers is None: 238 raise ValueError('XYZ file does not contain a supported header row') 239 240 return points, doi_points, layers 241 242 243 def write_csv(rows, base_path, suffix): 244 out_path = Path(base_path).with_suffix(suffix) 245 if not rows: 246 out_path.write_text('') 247 return out_path 248 with open(out_path, 'w', newline='') as f: 249 writer = csv.DictWriter(f, fieldnames=list(rows[0].keys())) 250 writer.writeheader() 251 writer.writerows(rows) 252 return out_path