ags-upload

Insert AGS files to a database
git clone git://src.adamsgaard.dk/ags-upload # fast
git clone https://src.adamsgaard.dk/ags-upload.git # slow
Log | Files | Refs Back to index

main.go (9443B)


      1 package main
      2 
      3 import (
      4 	"bytes"
      5 	"encoding/csv"
      6 	"fmt"
      7 	"io"
      8 	"log"
      9 	"net/http"
     10 	"os"
     11 	"strconv"
     12 	"strings"
     13 
     14 	"github.com/gin-gonic/gin"
     15 	"gorm.io/driver/postgres"
     16 	"gorm.io/gorm"
     17 	"gorm.io/gorm/schema"
     18 )
     19 
     20 // One row per CPT test (SCPG). Carries duplicated PROJ fields and selected LOCA fields.
     21 type CptInfo struct {
     22 	ID uint `gorm:"primaryKey"`
     23 
     24 	ProjSourceId string // PROJ_ID
     25 	ProjName     string // PROJ_NAME
     26 	ProjLocation string // PROJ_LOC
     27 	ProjClient   string // PROJ_CLNT
     28 	ProjContract string // PROJ_CONT
     29 
     30 	// Test identity
     31 	LocationId    string `gorm:"index:idx_loc_tesn,unique"` // LOCA_ID
     32 	TestReference string `gorm:"index:idx_loc_tesn,unique"` // SCPG_TESN
     33 
     34 	// Selected LOCA fields for the site
     35 	CoordRef    string   // LOCA_LREF
     36 	Datum       string   // LOCA_DATM
     37 	Remarks     string   // LOCA_REM
     38 	Depth       *float64 // LOCA_FDEP (m)
     39 	GroundLevel *float64 // LOCA_GL (m)
     40 	LocX        *float64 // LOCA_LOCX (m)
     41 	LocY        *float64 // LOCA_LOCY (m)
     42 	LocZ        *float64 // LOCA_LOCZ (m)
     43 }
     44 
     45 // Per-depth CPT data (SCPT)
     46 type Cpt struct {
     47 	ID            uint     `gorm:"primaryKey"`
     48 	InfoId        uint     `gorm:"index"` // FK -> CptInfo.ID
     49 	LocationId    string   `gorm:"index"` // LOCA_ID (redundant but handy)
     50 	TestReference string   `gorm:"index"` // SCPG_TESN (redundant but handy)
     51 	Depth         *float64 // SCPT_DPTH
     52 	Qc            *float64 // SCPT_RES (cone resistance)
     53 	Fs            *float64 // SCPT_FRES (side friction)
     54 	U1            *float64 // SCPT_PWP1
     55 	U2            *float64 // SCPT_PWP2
     56 	U3            *float64 // SCPT_PWP3
     57 	Rf            *float64 // SCPT_FRR (friction ratio)
     58 	Qt            *float64 // SCPT_QT  (corrected cone resistance)
     59 }
     60 
     61 // ParsePROJ+LOCA+SCPG+SCPT in a single pass.
     62 // Returns: slice of CptInfo (one per SCPG), and slice of Cpt (SCPT data points)
     63 func ParseAGSCptAll(r io.Reader) ([]CptInfo, []Cpt, error) {
     64 	norm, err := dos2unix(r)
     65 	if err != nil {
     66 		return nil, nil, fmt.Errorf("read: %w", err)
     67 	}
     68 
     69 	cr := csv.NewReader(norm)
     70 	cr.FieldsPerRecord = -1
     71 	cr.LazyQuotes = true
     72 
     73 	var (
     74 		curGroup     string
     75 		headersByGrp = map[string]map[string]int{}
     76 
     77 		// first PROJ row captured
     78 		proj struct {
     79 			id, name, loc, clnt, cont string
     80 		}
     81 
     82 		// LOCA rows keyed by LOCA_ID
     83 		locas = map[string]struct {
     84 			LRef, Datum, Remarks        string
     85 			Depth, GroundLevel, X, Y, Z *float64
     86 		}{}
     87 
     88 		// CptInfo keyed by (LOCA_ID, SCPG_TESN)
     89 		infosByKey = map[string]CptInfo{}
     90 
     91 		// SCPT data rows
     92 		cpts []Cpt
     93 	)
     94 
     95 	get := func(group string, data []string, name string) string {
     96 		m := headersByGrp[group]
     97 		if m == nil {
     98 			return ""
     99 		}
    100 		if idx, ok := m[strings.ToUpper(name)]; ok && idx >= 0 && idx < len(data) {
    101 			return data[idx]
    102 		}
    103 		return ""
    104 	}
    105 	fptr := func(s string) *float64 {
    106 		s = strings.TrimSpace(s)
    107 		if s == "" {
    108 			return nil
    109 		}
    110 		s = strings.ReplaceAll(s, ",", ".")
    111 		if f, err := strconv.ParseFloat(s, 64); err == nil {
    112 			return &f
    113 		}
    114 		return nil
    115 	}
    116 	mapKey := func(locID, tesn string) string { return locID + "\x00" + tesn }
    117 
    118 	for {
    119 		rec, err := cr.Read()
    120 		if err == io.EOF {
    121 			break
    122 		}
    123 		if err != nil {
    124 			return nil, nil, fmt.Errorf("csv: %w", err)
    125 		}
    126 		if len(rec) == 0 {
    127 			continue
    128 		}
    129 		for i := range rec {
    130 			rec[i] = strings.TrimSpace(rec[i])
    131 		}
    132 
    133 		switch strings.ToUpper(rec[0]) {
    134 		case "GROUP":
    135 			if len(rec) > 1 {
    136 				curGroup = strings.ToUpper(strings.TrimSpace(rec[1]))
    137 			} else {
    138 				curGroup = ""
    139 			}
    140 
    141 		case "HEADING":
    142 			if curGroup == "" {
    143 				continue
    144 			}
    145 			m := make(map[string]int, len(rec)-1)
    146 			for i := 1; i < len(rec); i++ {
    147 				m[strings.ToUpper(strings.TrimSpace(rec[i]))] = i - 1
    148 			}
    149 			headersByGrp[curGroup] = m
    150 
    151 		case "DATA":
    152 			if curGroup == "" {
    153 				continue
    154 			}
    155 			row := rec[1:]
    156 
    157 			switch curGroup {
    158 			case "PROJ":
    159 				if proj.id == "" {
    160 					proj.id = get("PROJ", row, "PROJ_ID")
    161 					proj.name = get("PROJ", row, "PROJ_NAME")
    162 					proj.loc = get("PROJ", row, "PROJ_LOC")
    163 					proj.clnt = get("PROJ", row, "PROJ_CLNT")
    164 					proj.cont = get("PROJ", row, "PROJ_CONT")
    165 				}
    166 
    167 			case "LOCA":
    168 				lid := get("LOCA", row, "LOCA_ID")
    169 				if lid == "" {
    170 					break
    171 				}
    172 				locas[lid] = struct {
    173 					LRef, Datum, Remarks        string
    174 					Depth, GroundLevel, X, Y, Z *float64
    175 				}{
    176 					LRef:        get("LOCA", row, "LOCA_LREF"),
    177 					Datum:       get("LOCA", row, "LOCA_DATM"),
    178 					Remarks:     get("LOCA", row, "LOCA_REM"),
    179 					Depth:       fptr(get("LOCA", row, "LOCA_FDEP")),
    180 					GroundLevel: fptr(get("LOCA", row, "LOCA_GL")),
    181 					X:           fptr(get("LOCA", row, "LOCA_LOCX")),
    182 					Y:           fptr(get("LOCA", row, "LOCA_LOCY")),
    183 					Z:           fptr(get("LOCA", row, "LOCA_LOCZ")),
    184 				}
    185 
    186 			case "SCPG":
    187 				locID := get("SCPG", row, "LOCA_ID")
    188 				tesn := get("SCPG", row, "SCPG_TESN")
    189 				if locID == "" || tesn == "" {
    190 					break
    191 				}
    192 				li := locas[locID]
    193 				infosByKey[mapKey(locID, tesn)] = CptInfo{
    194 					ProjSourceId: proj.id,
    195 					ProjName:     proj.name,
    196 					ProjLocation: proj.loc,
    197 					ProjClient:   proj.clnt,
    198 					ProjContract: proj.cont,
    199 
    200 					LocationId:    locID,
    201 					TestReference: tesn,
    202 
    203 					CoordRef:    li.LRef,
    204 					Datum:       li.Datum,
    205 					Remarks:     li.Remarks,
    206 					Depth:       li.Depth,
    207 					GroundLevel: li.GroundLevel,
    208 					LocX:        li.X,
    209 					LocY:        li.Y,
    210 					LocZ:        li.Z,
    211 				}
    212 
    213 			case "SCPT":
    214 				locID := get("SCPT", row, "LOCA_ID")
    215 				tesn := get("SCPT", row, "SCPG_TESN") // links SCPT to SCPG
    216 				cpts = append(cpts, Cpt{
    217 					LocationId:    locID,
    218 					TestReference: tesn,
    219 					Depth:         fptr(get("SCPT", row, "SCPT_DPTH")),
    220 					Qc:            fptr(get("SCPT", row, "SCPT_RES")),
    221 					Fs:            fptr(get("SCPT", row, "SCPT_FRES")),
    222 					U1:            fptr(get("SCPT", row, "SCPT_PWP1")),
    223 					U2:            fptr(get("SCPT", row, "SCPT_PWP2")),
    224 					U3:            fptr(get("SCPT", row, "SCPT_PWP3")),
    225 					Rf:            fptr(get("SCPT", row, "SCPT_FRR")),
    226 					Qt:            fptr(get("SCPT", row, "SCPT_QT")),
    227 				})
    228 			}
    229 		}
    230 	}
    231 
    232 	// Flatten infos map to slice
    233 	infos := make([]CptInfo, 0, len(infosByKey))
    234 	for _, v := range infosByKey {
    235 		infos = append(infos, v)
    236 	}
    237 	return infos, cpts, nil
    238 }
    239 
    240 func dos2unix(r io.Reader) (io.Reader, error) {
    241 	all, err := io.ReadAll(r)
    242 	if err != nil {
    243 		return nil, err
    244 	}
    245 	all = bytes.ReplaceAll(all, []byte("\r\n"), []byte("\n"))
    246 	all = bytes.ReplaceAll(all, []byte("\r"), []byte("\n"))
    247 	return bytes.NewReader(all), nil
    248 }
    249 
    250 func main() {
    251 	dsn := os.Getenv("DB_CONN")
    252 	dbSchema := "jupiter"
    253 
    254 	db, err := gorm.Open(postgres.Open(dsn),
    255 		&gorm.Config{
    256 			NamingStrategy: schema.NamingStrategy{
    257 				TablePrefix:   dbSchema + ".",
    258 				SingularTable: false,
    259 			},
    260 		})
    261 	if err != nil {
    262 		log.Fatal(err)
    263 	}
    264 
    265 	sql := fmt.Sprintf(`CREATE SCHEMA IF NOT EXISTS "%s"`, dbSchema)
    266 	if err := db.Exec(sql).Error; err != nil {
    267 		log.Fatal(err)
    268 	}
    269 
    270 	if err := db.AutoMigrate(&CptInfo{}, &Cpt{}); err != nil {
    271 		log.Fatal(err)
    272 	}
    273 
    274 	r := gin.Default()
    275 	r.MaxMultipartMemory = 32 << 20 // ~32 MB
    276 
    277 	r.POST("/ingest/ags", func(c *gin.Context) {
    278 		file, _, err := c.Request.FormFile("file")
    279 		if err != nil {
    280 			c.String(http.StatusBadRequest, "missing multipart file: %v", err)
    281 			return
    282 		}
    283 		defer file.Close()
    284 
    285 		infos, cpts, err := ParseAGSCptAll(file)
    286 		if err != nil {
    287 			c.String(http.StatusBadRequest, "parse error: %v", err)
    288 			return
    289 		}
    290 
    291 		err = db.Transaction(func(tx *gorm.DB) error {
    292             scptKeys := make(map[string]struct{}, len(cpts))
    293             key := func(locID, tesn string) string { return locID + "\x00" + tesn }
    294             for i := range cpts {
    295                 if cpts[i].LocationId == "" || cpts[i].TestReference == "" {
    296                     continue
    297                 }
    298                 scptKeys[key(cpts[i].LocationId, cpts[i].TestReference)] = struct{}{}
    299             }
    300 
    301             filtered := make([]CptInfo, 0, len(infos))
    302             for i := range infos {
    303                 k := key(infos[i].LocationId, infos[i].TestReference)
    304                 if _, ok := scptKeys[k]; ok {
    305                     filtered = append(filtered, infos[i])
    306                 }
    307             }
    308 
    309             lookup := make(map[string]uint, len(filtered))
    310             for i := range filtered {
    311                 ci := filtered[i] // copy for pointer stability
    312                 if err := tx.
    313                     Where("location_id = ? AND test_reference = ?", ci.LocationId, ci.TestReference).
    314                     Assign(&ci).
    315                     FirstOrCreate(&ci).Error; err != nil {
    316                     return err
    317                 }
    318                 lookup[key(ci.LocationId, ci.TestReference)] = ci.ID
    319             }
    320 
    321             out := make([]Cpt, 0, len(cpts))
    322             for i := range cpts {
    323                 id := lookup[key(cpts[i].LocationId, cpts[i].TestReference)]
    324                 if id == 0 {
    325                     continue // SCPT without a matching filtered info (or missing key) → skip
    326                 }
    327                 cpts[i].InfoId = id
    328                 out = append(out, cpts[i])
    329             }
    330 
    331             if len(out) > 0 {
    332                 if err := tx.CreateInBatches(out, 2000).Error; err != nil {
    333                     return err
    334                 }
    335             }
    336 
    337             // Optional: if nothing to insert at all, you might return an error or 204 outside
    338             return nil
    339 		})
    340 
    341 		if err != nil {
    342 			c.String(http.StatusInternalServerError, "db error: %v", err)
    343 			return
    344 		}
    345 
    346 		c.JSON(http.StatusCreated, gin.H{
    347 			"n_cpts":  len(cpts),
    348 		})
    349 	})
    350 
    351 	_ = r.Run(":8080")
    352 }