core.py (16718B)
1 import csv 2 import math 3 from pathlib import Path 4 import re 5 6 7 TEMIMAGE_REQUIRED_COLUMNS = {'X', 'Y', 'Z', 'DOI', 'DataResidual', 'NumLayers'} 8 AARHUS_REQUIRED_COLUMNS = { 9 'LINE_NO', 10 'X', 11 'Y', 12 'ELEVATION', 13 'RECORD', 14 'RESDATA', 15 'RHO_1', 16 'THK_1', 17 'DOI_STANDARD', 18 } 19 SCI_REQUIRED_COLUMNS = { 20 'Line_No', 21 'Layer_No', 22 'X', 23 'Y', 24 'Elevation_Cell', 25 'Resistivity', 26 'Depth_top', 27 'Depth_bottom', 28 'Thickness', 29 } 30 ATEM_SCI_REQUIRED_COLUMNS = { 31 'FID', 32 'LINE', 33 'E', 34 'N', 35 'DEM', 36 'RESI1', 37 'DOI', 38 'Res001', 39 'Elev001', 40 } 41 42 EPSG_PATTERN = re.compile(r"\bepsg\s*:\s*(\d+)\b", re.IGNORECASE) 43 HEADER_ALIASES = { 44 'UTMX': 'X', 45 'UTMY': 'Y', 46 } 47 48 ABOVE_DOI_OPACITY = 100 49 BELOW_DOI_OPACITY = 10 50 51 52 def validate_opacity(value): 53 try: 54 opacity = int(value) 55 except (TypeError, ValueError): 56 raise ValueError('opacity must be between 0 and 100') from None 57 if opacity < 0 or opacity > 100: 58 raise ValueError('opacity must be between 0 and 100') 59 return opacity 60 61 62 def layer_opacity( 63 depth_mid, 64 doi, 65 mask_below_doi=True, 66 below_doi_opacity=BELOW_DOI_OPACITY, 67 ): 68 below_doi_opacity = validate_opacity(below_doi_opacity) 69 if doi is None or not mask_below_doi: 70 return ABOVE_DOI_OPACITY 71 return ABOVE_DOI_OPACITY if depth_mid <= doi else below_doi_opacity 72 73 74 # Resistivity ranges mirror the graduated symbology in styles/layers.qml. 75 # Each entry is (upper_exclusive, '#RRGGBB'); the final entry is the catch-all. 76 RESISTIVITY_COLOR_RAMP = ( 77 (0.1, '#000091'), 78 (1.0, '#000091'), 79 (2.0, '#0000b4'), 80 (4.0, '#0032dc'), 81 (7.0, '#005af5'), 82 (10.0, '#008cff'), 83 (15.0, '#00beff'), 84 (20.0, '#00dcff'), 85 (25.0, '#00ffff'), 86 (30.0, '#00ff96'), 87 (35.0, '#00ff00'), 88 (40.0, '#96ff00'), 89 (50.0, '#d2ff00'), 90 (60.0, '#ffff00'), 91 (75.0, '#ffb500'), 92 (90.0, '#ff7300'), 93 (120.0, '#ff0000'), 94 (150.0, '#ff1c8d'), 95 (200.0, '#ff6aff'), 96 (250.0, '#f200f2'), 97 (300.0, '#ca00ca'), 98 (400.0, '#a600a6'), 99 (600.0, '#800080'), 100 (1600.0, '#750075'), 101 (float('inf'), '#540054'), 102 ) 103 UNKNOWN_RESISTIVITY_COLOR = '#ffffff' 104 105 106 def resistivity_color(value): 107 if value is None: 108 return UNKNOWN_RESISTIVITY_COLOR 109 try: 110 v = float(value) 111 except (ValueError, TypeError): 112 return UNKNOWN_RESISTIVITY_COLOR 113 if math.isnan(v): 114 return UNKNOWN_RESISTIVITY_COLOR 115 for upper, color in RESISTIVITY_COLOR_RAMP: 116 if v < upper: 117 return color 118 return RESISTIVITY_COLOR_RAMP[-1][1] 119 120 121 def normalize_header_tokens(line): 122 normalized = [] 123 for token in line.split(): 124 token = token.lstrip('/') 125 token = HEADER_ALIASES.get(token, token) 126 if token: 127 normalized.append(token) 128 return normalized 129 130 131 def detect_format(headers): 132 header_set = set(headers) 133 if TEMIMAGE_REQUIRED_COLUMNS.issubset(header_set): 134 return 'temimage' 135 if AARHUS_REQUIRED_COLUMNS.issubset(header_set): 136 return 'aarhus' 137 if SCI_REQUIRED_COLUMNS.issubset(header_set): 138 return 'sci' 139 if ATEM_SCI_REQUIRED_COLUMNS.issubset(header_set): 140 return 'atem_sci' 141 return None 142 143 144 def is_header_line(line): 145 return detect_format(normalize_header_tokens(line)) is not None 146 147 148 def detect_source_epsg(path, comment_char='/'): 149 with open(path, 'r') as f: 150 for line in f: 151 stripped = line.lstrip() 152 if is_header_line(stripped): 153 break 154 if not stripped.startswith(comment_char): 155 break 156 match = EPSG_PATTERN.search(stripped) 157 if match is not None: 158 return f'EPSG:{match.group(1)}' 159 return None 160 161 162 def get_numbered_columns(headers, prefix): 163 numbered = [] 164 for name in headers: 165 if not name.startswith(prefix): 166 continue 167 suffix = name[len(prefix):] 168 if suffix.isdigit(): 169 numbered.append((int(suffix), name)) 170 return [name for _, name in sorted(numbered)] 171 172 173 def parse_num_layers(value): 174 try: 175 parsed = float(value) 176 except (ValueError, TypeError): 177 return None 178 if math.isnan(parsed): 179 return None 180 return int(parsed) 181 182 183 def count_valid_layers(row, res_cols, thick_cols): 184 count = 0 185 for res_col, thick_col in zip(res_cols, thick_cols): 186 try: 187 res = float(row.get(res_col, '')) 188 thick = float(row.get(thick_col, '')) 189 except (ValueError, TypeError): 190 break 191 if math.isnan(res) or math.isnan(thick): 192 break 193 count += 1 194 return count 195 196 197 def process_xyz(path, mask_below_doi=True, below_doi_opacity=BELOW_DOI_OPACITY): 198 below_doi_opacity = validate_opacity(below_doi_opacity) 199 200 points = [] 201 doi_points = [] 202 layers = [] 203 headers = None 204 205 with open(path, 'r') as f: 206 for line_number, raw_line in enumerate(f, start=1): 207 stripped = raw_line.strip() 208 left_stripped = raw_line.lstrip() 209 210 if headers is None: 211 if is_header_line(left_stripped): 212 headers = normalize_header_tokens(left_stripped) 213 source_format = detect_format(headers) 214 if source_format == 'sci': 215 return _process_sci_rows(f, headers, line_number) 216 if source_format == 'atem_sci': 217 res_cols = get_numbered_columns(headers, 'Res') 218 elev_cols = get_numbered_columns(headers, 'Elev') 219 thick_cols = [] 220 elif source_format == 'temimage': 221 res_cols = get_numbered_columns(headers, 'Res_') 222 thick_cols = get_numbered_columns(headers, 'Thick_') 223 else: 224 res_cols = get_numbered_columns(headers, 'RHO_') 225 thick_cols = get_numbered_columns(headers, 'THK_') 226 dep_top_cols = get_numbered_columns(headers, 'DEP_TOP_') 227 dep_bot_cols = get_numbered_columns(headers, 'DEP_BOT_') 228 continue 229 if not left_stripped.startswith('/'): 230 raise ValueError('XYZ file does not contain a supported header row') 231 continue 232 233 if not stripped: 234 continue 235 236 values = stripped.split() 237 # When the data has one more column than the header, pandas would 238 # treat the first column as the row index. Replicate that here. 239 if len(values) == len(headers) + 1: 240 values = values[1:] 241 if len(values) != len(headers): 242 raise ValueError( 243 f'Row {line_number} has {len(values)} columns, ' 244 f'expected {len(headers)}' 245 ) 246 247 row = dict(zip(headers, values)) 248 if source_format == 'atem_sci': 249 _append_atem_sci_row( 250 row, 251 res_cols, 252 elev_cols, 253 points, 254 doi_points, 255 layers, 256 mask_below_doi, 257 below_doi_opacity, 258 ) 259 continue 260 261 x = float(row['X']) 262 y = float(row['Y']) 263 264 if source_format == 'temimage': 265 z = float(row['Z']) 266 doi = float(row['DOI']) 267 data_residual = float(row['DataResidual']) 268 n_layers = parse_num_layers(row['NumLayers']) 269 line = row['Line'] 270 station_no = row['StationNo'] 271 else: 272 z = float(row['ELEVATION']) 273 doi = float(row['DOI_STANDARD']) 274 data_residual = float(row['RESDATA']) 275 line = str(int(float(row['LINE_NO']))) 276 record = int(float(row['RECORD'])) 277 station_no = f'{line}_{record:05d}' 278 n_layers = count_valid_layers(row, res_cols, thick_cols) 279 280 z_doi = z - doi 281 point_wkt = f'POINT Z ({x} {y} {z})' 282 doi_wkt = f'POINT Z ({x} {y} {z_doi})' 283 284 points.append({ 285 'X': x, 286 'Y': y, 287 'Z': z, 288 'Line': line, 289 'StationNo': station_no, 290 'DataResidual': data_residual, 291 'NumLayers': n_layers, 292 'Geometry': point_wkt, 293 }) 294 doi_points.append({ 295 'X': x, 296 'Y': y, 297 'Z': z_doi, 298 'DOI': doi, 299 'ZDOI': z_doi, 300 'Geometry': doi_wkt, 301 }) 302 303 max_layers = min(len(res_cols), len(thick_cols)) 304 if n_layers is not None: 305 max_layers = min(max_layers, n_layers) 306 307 cum_depth = 0.0 308 for i in range(max_layers): 309 res_col = res_cols[i] 310 thick_col = thick_cols[i] 311 res_val = row.get(res_col, '') 312 thick_val = row.get(thick_col, '') 313 try: 314 res = float(res_val) 315 thick = float(thick_val) 316 except (ValueError, TypeError): 317 break 318 if math.isnan(res) or math.isnan(thick): 319 break 320 321 dep_top_col = dep_top_cols[i] if i < len(dep_top_cols) else None 322 dep_bot_col = dep_bot_cols[i] if i < len(dep_bot_cols) else None 323 if dep_top_col and dep_bot_col: 324 try: 325 depth_top = float(row[dep_top_col]) 326 depth_bottom = float(row[dep_bot_col]) 327 if math.isnan(depth_top) or math.isnan(depth_bottom): 328 raise ValueError 329 except (ValueError, TypeError): 330 depth_top = cum_depth 331 depth_bottom = cum_depth + thick 332 else: 333 depth_top = cum_depth 334 depth_bottom = cum_depth + thick 335 336 depth_mid = (depth_top + depth_bottom) / 2 337 z_top = z - depth_top 338 z_bot = z - depth_bottom 339 z_mid = (z_top + z_bot) / 2 340 cum_depth = depth_bottom 341 342 layer_wkt = f'LINESTRING Z ({x} {y} {z_top}, {x} {y} {z_bot})' 343 layers.append({ 344 'X': x, 345 'Y': y, 346 'Z': z, 347 'ZTop': z_top, 348 'ZMid': z_mid, 349 'ZBottom': z_bot, 350 'DepthTop': depth_top, 351 'DepthBottom': depth_bottom, 352 'Resistivity': res, 353 'Color': resistivity_color(res), 354 'Opacity': layer_opacity( 355 depth_mid, 356 doi, 357 mask_below_doi, 358 below_doi_opacity, 359 ), 360 'Layer': i + 1, 361 'Geometry': layer_wkt, 362 }) 363 364 if headers is None: 365 raise ValueError('XYZ file does not contain a supported header row') 366 367 return points, doi_points, layers 368 369 370 def _append_atem_sci_row( 371 row, 372 res_cols, 373 elev_cols, 374 points, 375 doi_points, 376 layers, 377 mask_below_doi=True, 378 below_doi_opacity=BELOW_DOI_OPACITY, 379 ): 380 x = float(row['E']) 381 y = float(row['N']) 382 z = float(row['DEM']) 383 doi = float(row['DOI']) 384 line = str(int(float(row['LINE']))) 385 fid = int(float(row['FID'])) 386 station_no = f'{line}_{fid:05d}' 387 388 z_tops = [] 389 for elev_col in elev_cols: 390 z_top = float(row[elev_col]) 391 if math.isnan(z_top): 392 break 393 z_tops.append(z_top) 394 395 layer_rows = [] 396 max_layers = min(len(res_cols), len(z_tops)) 397 for i in range(max_layers): 398 res = float(row[res_cols[i]]) 399 if math.isnan(res): 400 break 401 if i + 1 < len(z_tops): 402 z_bot = z_tops[i + 1] 403 elif i > 0: 404 z_bot = z_tops[i] - (z_tops[i - 1] - z_tops[i]) 405 else: 406 break 407 z_top = z_tops[i] 408 z_mid = (z_top + z_bot) / 2 409 depth_top = z - z_top 410 depth_bottom = z - z_bot 411 depth_mid = (depth_top + depth_bottom) / 2 412 layer_rows.append({ 413 'X': x, 414 'Y': y, 415 'Z': z, 416 'ZTop': z_top, 417 'ZMid': z_mid, 418 'ZBottom': z_bot, 419 'DepthTop': depth_top, 420 'DepthBottom': depth_bottom, 421 'Resistivity': res, 422 'Color': resistivity_color(res), 423 'Opacity': layer_opacity( 424 depth_mid, 425 doi, 426 mask_below_doi, 427 below_doi_opacity, 428 ), 429 'Layer': i + 1, 430 'Geometry': f'LINESTRING Z ({x} {y} {z_top}, {x} {y} {z_bot})', 431 }) 432 433 points.append({ 434 'X': x, 435 'Y': y, 436 'Z': z, 437 'Line': line, 438 'StationNo': station_no, 439 'DataResidual': float(row['RESI1']), 440 'NumLayers': len(layer_rows), 441 'Geometry': f'POINT Z ({x} {y} {z})', 442 }) 443 doi_points.append({ 444 'X': x, 445 'Y': y, 446 'Z': z - doi, 447 'DOI': doi, 448 'ZDOI': z - doi, 449 'Geometry': f'POINT Z ({x} {y} {z - doi})', 450 }) 451 layers.extend(layer_rows) 452 453 454 def _process_sci_rows(f, headers, header_line_number): 455 soundings = {} 456 for offset, raw_line in enumerate(f, start=1): 457 line_number = header_line_number + offset 458 stripped = raw_line.strip() 459 if not stripped: 460 continue 461 462 values = stripped.split() 463 if len(values) == len(headers) + 1: 464 values = values[1:] 465 if len(values) != len(headers): 466 raise ValueError( 467 f'Row {line_number} has {len(values)} columns, ' 468 f'expected {len(headers)}' 469 ) 470 471 row = dict(zip(headers, values)) 472 key = (int(float(row['Line_No'])), row['X'], row['Y']) 473 soundings.setdefault(key, []).append(row) 474 475 points = [] 476 layers = [] 477 line_ordinals = {} 478 479 for (line_no, x_str, y_str), rows in soundings.items(): 480 rows.sort(key=lambda r: int(float(r['Layer_No']))) 481 ordinal = line_ordinals.get(line_no, 0) + 1 482 line_ordinals[line_no] = ordinal 483 484 line = str(line_no) 485 station_no = f'{line}_{ordinal:05d}' 486 x = float(x_str) 487 y = float(y_str) 488 489 layer1 = rows[0] 490 surface_z = float(layer1['Elevation_Cell']) + float(layer1['Thickness']) / 2.0 491 492 n_layers = 0 493 for r in rows: 494 try: 495 res = float(r['Resistivity']) 496 thick = float(r['Thickness']) 497 depth_top = float(r['Depth_top']) 498 depth_bottom = float(r['Depth_bottom']) 499 except (ValueError, TypeError): 500 break 501 if math.isnan(res) or math.isnan(thick): 502 break 503 if math.isnan(depth_top) or math.isnan(depth_bottom): 504 break 505 506 z_top = surface_z - depth_top 507 z_bot = surface_z - depth_bottom 508 z_mid = (z_top + z_bot) / 2 509 layer_no = int(float(r['Layer_No'])) 510 511 layer_wkt = f'LINESTRING Z ({x} {y} {z_top}, {x} {y} {z_bot})' 512 layers.append({ 513 'X': x, 514 'Y': y, 515 'Z': surface_z, 516 'ZTop': z_top, 517 'ZMid': z_mid, 518 'ZBottom': z_bot, 519 'DepthTop': depth_top, 520 'DepthBottom': depth_bottom, 521 'Resistivity': res, 522 'Color': resistivity_color(res), 523 'Opacity': layer_opacity(depth_bottom, None), 524 'Layer': layer_no, 525 'Geometry': layer_wkt, 526 }) 527 n_layers += 1 528 529 point_wkt = f'POINT Z ({x} {y} {surface_z})' 530 points.append({ 531 'X': x, 532 'Y': y, 533 'Z': surface_z, 534 'Line': line, 535 'StationNo': station_no, 536 'NumLayers': n_layers, 537 'Geometry': point_wkt, 538 }) 539 540 return points, [], layers 541 542 543 def write_csv(rows, base_path, suffix): 544 out_path = Path(base_path).with_suffix(suffix) 545 if not rows: 546 out_path.write_text('') 547 return out_path 548 with open(out_path, 'w', newline='') as f: 549 writer = csv.DictWriter(f, fieldnames=list(rows[0].keys())) 550 writer.writeheader() 551 writer.writerows(rows) 552 return out_path