commit 5398f115e1bb370afbd22e3812182ecc60d07e5a
parent 776acf75daf694f6fa229ddaa414b8bbbbdd9c0b
Author: Anders Damsgaard <anders@adamsgaard.dk>
Date: Sun, 10 May 2026 00:05:03 +0200
feat(qgis): add GeoPackage writer helpers
Diffstat:
| M | tem_loader/tem_loader.py | | | 76 | ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ |
| M | test/test_core.py | | | 251 | ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------- |
2 files changed, 306 insertions(+), 21 deletions(-)
diff --git a/tem_loader/tem_loader.py b/tem_loader/tem_loader.py
@@ -18,6 +18,82 @@ from . import core
STYLES_DIR = Path(__file__).parent / 'styles'
+GEOMETRY_FIELD = 'Geometry'
+STRING_FIELDS = {'Line', 'StationNo', 'Color'}
+INTEGER_FIELDS = {'NumLayers', 'Layer'}
+
+
+def _build_geopackage_uri(gpkg_path, layer_name):
+ return f'{gpkg_path.resolve()}|layername={layer_name}'
+
+
+def _field_type(field_name):
+ if field_name in STRING_FIELDS:
+ return QMetaType.Type.QString
+ if field_name in INTEGER_FIELDS:
+ return QMetaType.Type.Int
+ return QMetaType.Type.Double
+
+
+def _build_fields(rows):
+ fields = QgsFields()
+ if not rows:
+ return fields
+ for field_name in rows[0]:
+ if field_name == GEOMETRY_FIELD:
+ continue
+ fields.append(QgsField(field_name, _field_type(field_name)))
+ return fields
+
+
+def _rows_to_features(rows, fields):
+ field_names = [field.name() for field in fields]
+ features = []
+ for row in rows:
+ feature = QgsFeature(fields)
+ feature.setGeometry(QgsGeometry.fromWkt(row[GEOMETRY_FIELD]))
+ feature.setAttributes([row.get(name) for name in field_names])
+ features.append(feature)
+ return features
+
+
+def _writer_no_error():
+ writer_error = getattr(QgsVectorFileWriter, 'WriterError', QgsVectorFileWriter)
+ return writer_error.NoError
+
+
+def _write_geopackage_layer(rows, gpkg_path, layer_name, wkb_type, crs,
+ transform_context, action):
+ fields = _build_fields(rows)
+ options = QgsVectorFileWriter.SaveVectorOptions()
+ options.driverName = 'GPKG'
+ options.fileEncoding = 'UTF-8'
+ options.layerName = layer_name
+ options.actionOnExistingFile = action
+
+ writer = QgsVectorFileWriter.create(
+ str(gpkg_path),
+ fields,
+ wkb_type,
+ crs,
+ transform_context,
+ options,
+ )
+ if writer is None:
+ raise ValueError(f'failed to create GeoPackage layer {layer_name}')
+
+ try:
+ if writer.hasError() != _writer_no_error():
+ message = writer.errorMessage()
+ details = f': {message}' if message else ''
+ raise ValueError(
+ f'failed to create GeoPackage layer {layer_name}{details}'
+ )
+ features = _rows_to_features(rows, fields)
+ if features and not writer.addFeatures(features):
+ raise ValueError(f'failed to write GeoPackage layer {layer_name}')
+ finally:
+ del writer
def _build_delimited_text_uri(csv_path, geom_type, crs_str):
diff --git a/test/test_core.py b/test/test_core.py
@@ -339,18 +339,123 @@ class PluginTests(unittest.TestCase):
qtwidgets.QFileDialog = FakeFileDialog
qtwidgets.QMessageBox = FakeMessageBox
+ class FakeQgis:
+ class WkbType:
+ PointZ = "PointZ"
+ LineStringZ = "LineStringZ"
+
+ class FakeField:
+ def __init__(self, name, field_type):
+ self._name = name
+ self.field_type = field_type
+
+ def name(self):
+ return self._name
+
+ class FakeFields:
+ def __init__(self):
+ self._fields = []
+
+ def append(self, field):
+ self._fields.append(field)
+
+ def __iter__(self):
+ return iter(self._fields)
+
+ def __len__(self):
+ return len(self._fields)
+
+ def __getitem__(self, index):
+ return self._fields[index]
+
+ class FakeGeometry:
+ def __init__(self, wkt):
+ self.wkt = wkt
+
+ @staticmethod
+ def fromWkt(wkt):
+ return FakeGeometry(wkt)
+
+ class FakeFeature:
+ def __init__(self, fields):
+ self.fields = fields
+ self.geometry = None
+ self.attributes = None
+
+ def setGeometry(self, geometry):
+ self.geometry = geometry
+
+ def setAttributes(self, attributes):
+ self.attributes = list(attributes)
+
+ class FakeSaveVectorOptions:
+ def __init__(self):
+ self.driverName = None
+ self.fileEncoding = None
+ self.layerName = None
+ self.actionOnExistingFile = None
+
+ class FakeVectorFileWriter:
+ NoError = "NoError"
+ CreateOrOverwriteFile = "CreateOrOverwriteFile"
+ CreateOrOverwriteLayer = "CreateOrOverwriteLayer"
+ calls = []
+ created = []
+ next_writer = None
+
+ class WriterError:
+ NoError = "NoError"
+
+ SaveVectorOptions = FakeSaveVectorOptions
+
+ def __init__(self, error=NoError, message="", add_features_result=True):
+ self._error = error
+ self._message = message
+ self._add_features_result = add_features_result
+ self.features = []
+
+ @staticmethod
+ def create(
+ file_name, fields, geometry_type, crs, transform_context, options
+ ):
+ writer = FakeVectorFileWriter.next_writer or FakeVectorFileWriter()
+ FakeVectorFileWriter.next_writer = None
+ FakeVectorFileWriter.calls.append({
+ "fileName": file_name,
+ "fields": fields,
+ "geometryType": geometry_type,
+ "crs": crs,
+ "transformContext": transform_context,
+ "driverName": options.driverName,
+ "fileEncoding": options.fileEncoding,
+ "layerName": options.layerName,
+ "actionOnExistingFile": options.actionOnExistingFile,
+ })
+ FakeVectorFileWriter.created.append(writer)
+ return writer
+
+ def hasError(self):
+ return self._error
+
+ def errorMessage(self):
+ return self._message
+
+ def addFeatures(self, features):
+ self.features.extend(features)
+ return self._add_features_result
+
qgis_core = types.ModuleType("qgis.core")
- qgis_core.Qgis = type("Qgis", (), {})
+ qgis_core.Qgis = FakeQgis
qgis_core.QgsProject = type("QgsProject", (), {})
qgis_core.QgsVectorLayer = type("QgsVectorLayer", (), {})
qgis_core.QgsCoordinateReferenceSystem = type(
"QgsCoordinateReferenceSystem", (), {}
)
- qgis_core.QgsFeature = type("QgsFeature", (), {})
- qgis_core.QgsField = type("QgsField", (), {})
- qgis_core.QgsFields = type("QgsFields", (), {})
- qgis_core.QgsGeometry = type("QgsGeometry", (), {})
- qgis_core.QgsVectorFileWriter = type("QgsVectorFileWriter", (), {})
+ qgis_core.QgsFeature = FakeFeature
+ qgis_core.QgsField = FakeField
+ qgis_core.QgsFields = FakeFields
+ qgis_core.QgsGeometry = FakeGeometry
+ qgis_core.QgsVectorFileWriter = FakeVectorFileWriter
module_map = {
"qgis": types.ModuleType("qgis"),
@@ -383,24 +488,128 @@ class PluginTests(unittest.TestCase):
self.assertIn("bad.xyz", message_box.warnings[0][2])
self.assertIn("Row 3 has 4 columns, expected 6", message_box.warnings[0][2])
- def test_build_delimited_text_uri_preserves_local_file_uri(self):
+ def test_build_geopackage_uri_uses_layername(self):
+ module, _, _ = self._import_plugin_module()
+ gpkg_path = Mock()
+ gpkg_path.resolve.return_value = Path("/tmp/model.gpkg")
+
+ uri = module._build_geopackage_uri(gpkg_path, "layers")
+
+ self.assertEqual(uri, "/tmp/model.gpkg|layername=layers")
+
+ def test_build_fields_skips_geometry_and_uses_expected_types(self):
module, _, _ = self._import_plugin_module()
- csv_path = Mock()
- resolved = Mock()
- resolved.as_uri.return_value = "file:///C:/temp/model.layers.csv"
- csv_path.resolve.return_value = resolved
+ rows = [
+ {
+ "X": 1.0,
+ "Line": "1",
+ "StationNo": "1_00001",
+ "NumLayers": 30,
+ "Layer": 1,
+ "Color": "#00ff00",
+ "Geometry": "POINT Z (1 2 3)",
+ }
+ ]
+
+ fields = module._build_fields(rows)
- uri = module._build_delimited_text_uri(
- csv_path, "LineString", "EPSG:32632"
+ self.assertEqual(
+ [field.name() for field in fields],
+ ["X", "Line", "StationNo", "NumLayers", "Layer", "Color"],
+ )
+ self.assertEqual(
+ [field.field_type for field in fields],
+ [
+ module.QMetaType.Type.Double,
+ module.QMetaType.Type.QString,
+ module.QMetaType.Type.QString,
+ module.QMetaType.Type.Int,
+ module.QMetaType.Type.Int,
+ module.QMetaType.Type.QString,
+ ],
+ )
+
+ def test_rows_to_features_copies_geometry_and_attributes(self):
+ module, _, _ = self._import_plugin_module()
+ rows = [
+ {
+ "X": 1.0,
+ "Line": "7",
+ "Geometry": "POINT Z (1 2 3)",
+ }
+ ]
+ fields = module._build_fields(rows)
+
+ features = module._rows_to_features(rows, fields)
+
+ self.assertEqual(len(features), 1)
+ self.assertEqual(features[0].geometry.wkt, "POINT Z (1 2 3)")
+ self.assertEqual(features[0].attributes, [1.0, "7"])
+
+ def test_write_geopackage_layer_configures_writer_and_features(self):
+ module, _, _ = self._import_plugin_module()
+ rows = [
+ {
+ "X": 1.0,
+ "Y": 2.0,
+ "Layer": 1,
+ "Color": "#00ff00",
+ "Geometry": "LINESTRING Z (1 2 3, 1 2 2)",
+ }
+ ]
+ crs = object()
+ transform_context = object()
+
+ module._write_geopackage_layer(
+ rows,
+ Path("/tmp/model.gpkg"),
+ "layers",
+ module.Qgis.WkbType.LineStringZ,
+ crs,
+ transform_context,
+ module.QgsVectorFileWriter.CreateOrOverwriteFile,
)
+ call = module.QgsVectorFileWriter.calls[0]
+ writer = module.QgsVectorFileWriter.created[0]
+ self.assertEqual(call["fileName"], "/tmp/model.gpkg")
+ self.assertEqual(call["geometryType"], module.Qgis.WkbType.LineStringZ)
+ self.assertIs(call["crs"], crs)
+ self.assertIs(call["transformContext"], transform_context)
+ self.assertEqual(call["driverName"], "GPKG")
+ self.assertEqual(call["fileEncoding"], "UTF-8")
+ self.assertEqual(call["layerName"], "layers")
self.assertEqual(
- uri,
- "file:///C:/temp/model.layers.csv"
- "?type=LineString"
- "&crs=EPSG:32632"
- "&wktField=Geometry"
- "&delimiter=,"
- "&xField=X&yField=Y"
- "&spatialIndex=yes",
+ call["actionOnExistingFile"],
+ module.QgsVectorFileWriter.CreateOrOverwriteFile,
)
+ self.assertEqual(len(writer.features), 1)
+ self.assertEqual(
+ writer.features[0].geometry.wkt,
+ "LINESTRING Z (1 2 3, 1 2 2)",
+ )
+ self.assertEqual(
+ writer.features[0].attributes,
+ [1.0, 2.0, 1, "#00ff00"],
+ )
+
+ def test_write_geopackage_layer_reports_writer_error(self):
+ module, _, _ = self._import_plugin_module()
+ module.QgsVectorFileWriter.next_writer = module.QgsVectorFileWriter(
+ error="ErrCreateLayer",
+ message="disk full",
+ )
+
+ with self.assertRaisesRegex(
+ ValueError,
+ "failed to create GeoPackage layer points: disk full",
+ ):
+ module._write_geopackage_layer(
+ [{"X": 1.0, "Geometry": "POINT Z (1 2 3)"}],
+ Path("/tmp/model.gpkg"),
+ "points",
+ module.Qgis.WkbType.PointZ,
+ object(),
+ object(),
+ module.QgsVectorFileWriter.CreateOrOverwriteFile,
+ )