Skip to content

Handlers Module

create_geotiff(layer, output_file, crs, spatial_extent, nodata=-9999)

Creates a GeoTIFF file from the given data layer. Note, it performs a transpose on the layer.

:param layer: The data layer to be written into the GeoTIFF file. Assumes (X, Y) shape. :type layer: numpy.ndarray :param output_file: The path where the GeoTIFF file will be saved. :type output_file: str :param crs: The coordinate reference system for the GeoTIFF. :type crs: str :param spatial_extent: The spatial extent of the data, defined as (x_min, x_max, y_min, y_max). :type spatial_extent: tuple :param nodata: The value to use for NoData areas in the GeoTIFF. Defaults to -9999. :type nodata: float or int, optional :return: None :raises rasterio.errors.RasterioError: If there is an error in creating the GeoTIFF. :raises ValueError: If the layer has invalid dimensions or the spatial extent is invalid.

Source code in pyforestscan/handlers.py
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
def create_geotiff(layer, output_file, crs, spatial_extent, nodata=-9999):
    """
    Creates a GeoTIFF file from the given data layer. Note, it performs a transpose on the layer.

    :param layer: The data layer to be written into the GeoTIFF file. Assumes (X, Y) shape.
    :type layer: numpy.ndarray
    :param output_file: The path where the GeoTIFF file will be saved.
    :type output_file: str
    :param crs: The coordinate reference system for the GeoTIFF.
    :type crs: str
    :param spatial_extent: The spatial extent of the data, defined as (x_min, x_max, y_min, y_max).
    :type spatial_extent: tuple
    :param nodata: The value to use for NoData areas in the GeoTIFF. Defaults to -9999.
    :type nodata: float or int, optional
    :return: None
    :raises rasterio.errors.RasterioError: If there is an error in creating the GeoTIFF.
    :raises ValueError: If the layer has invalid dimensions or the spatial extent is invalid.
    """
    layer = np.nan_to_num(layer, nan=-9999)

    x_min, x_max, y_min, y_max = spatial_extent

    if layer.size == 0 or layer.shape[0] == 0 or layer.shape[1] == 0:
        raise ValueError(f"Invalid layer dimensions: {layer.shape}. Cannot create GeoTIFF.")

    if x_max <= x_min or y_max <= y_min:
        raise ValueError(f"Invalid spatial extent: {spatial_extent}.")

    layer = layer.T

    transform = from_bounds(
        x_min, y_min, x_max, y_max,
        layer.shape[1], layer.shape[0]
    )

    with rasterio.open(
            output_file, 'w',
            driver='GTiff',
            height=layer.shape[0],
            width=layer.shape[1],
            count=1,
            dtype=layer.dtype.name,
            crs=crs,
            transform=transform,
            nodata=nodata
    ) as new_dataset:
        new_dataset.write(layer, 1)

get_raster_epsg(dtm_path)

Retrieve the EPSG code from a raster file.

:param dtm_path: str The file path to the raster file. :return: str The EPSG code as a string. :raises FileNotFoundError: If the specified file does not exist.

Source code in pyforestscan/handlers.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def get_raster_epsg(dtm_path):
    """
    Retrieve the EPSG code from a raster file.

    :param dtm_path: str
        The file path to the raster file.
    :return: str
        The EPSG code as a string.
    :raises FileNotFoundError:
        If the specified file does not exist.
    """
    if not os.path.isfile(dtm_path):
        raise FileNotFoundError(f"No such file: '{dtm_path}'")
    with rasterio.open(dtm_path) as dtm:
        return dtm.crs.to_string()

load_polygon_from_file(vector_file_path, index=0)

Load a polygon geometry and its CRS from a given vector file.

:param vector_file_path: str, Path to the vector file containing the polygon. :param index: int, optional, Index of the polygon to be loaded (default is 0). :return: tuple, containing the Well-Known Text (WKT) representation of the polygon and the coordinate reference system (CRS) as a string. :raises FileNotFoundError: If the vector file does not exist. :raises ValueError: If the file cannot be read or is not a valid vector file format.

Source code in pyforestscan/handlers.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def load_polygon_from_file(vector_file_path, index=0):
    """
    Load a polygon geometry and its CRS from a given vector file.

    :param vector_file_path: str, Path to the vector file containing the polygon.
    :param index: int, optional, Index of the polygon to be loaded (default is 0).
    :return: tuple, containing the Well-Known Text (WKT) representation of the polygon and the coordinate reference system (CRS) as a string.
    :raises FileNotFoundError: If the vector file does not exist.
    :raises ValueError: If the file cannot be read or is not a valid vector file format.
    """
    if not os.path.isfile(vector_file_path):
        raise FileNotFoundError(f"No such file: '{vector_file_path}'")

    try:
        gdf = gpd.read_file(vector_file_path)
    except Exception as e:
        raise ValueError(f"Unable to read file: {vector_file_path}. Ensure it is a valid vector file format.") from e

    polygon = gdf.loc[index, 'geometry']
    if isinstance(polygon, MultiPolygon):
        polygon = list(polygon.geoms)[0]
    return polygon.wkt, gdf.crs.to_string()

read_lidar(input_file, srs, bounds=None, thin_radius=None, hag=False, hag_dtm=False, dtm=None, crop_poly=False, poly=None)

Reads and processes a LiDAR point cloud file using PDAL based on specified options.

:param input_file: str, The path to the input LiDAR file. Supported formats are .las, .laz, .copc, and .copc.laz. :param srs: str, The Spatial Reference System (SRS) of the point cloud. :param bounds: Bounds within which to crop the data. Only in effect for ept format. Must be of the form: ([xmin, xmax], [ymin, ymax], [zmin, zmax]) :param thin_radius: float, optional, The radius for thinning the point cloud. Must be a positive number. :param hag: bool, optional, If True, calculate Height Above Ground (HAG) using Delaunay triangulation. :param hag_dtm: bool, optional, If True, calculate Height Above Ground (HAG) using a DTM file. :param dtm: str, optional, The path to the DTM file used when hag_dtm is True. Must be a .tif file. :param crop_poly: bool, optional, If True, crop the point cloud using the polygon defined in the poly file. :param poly: str, optional, The path to the polygon file used for cropping OR the WKT of the Polygon geometry.

:return: numpy.ndarray, The processed point cloud data or None if no data is retrieved.

:raises FileNotFoundError: If the input file, polygon file, or DTM file does not exist. :raises ValueError: If the input file extension is unsupported, thinning radius is non-positive, or both 'hag' and 'hag_dtm' are True simultaneously, or the DTM file path is not provided when 'hag_dtm' is True.

Source code in pyforestscan/handlers.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def read_lidar(input_file, srs, bounds=None, thin_radius=None, hag=False, hag_dtm=False, dtm=None, crop_poly=False, poly=None):
    """
    Reads and processes a LiDAR point cloud file using PDAL based on specified options.

    :param input_file: str, The path to the input LiDAR file. Supported formats are .las, .laz, .copc, and .copc.laz.
    :param srs: str, The Spatial Reference System (SRS) of the point cloud.
    :param bounds:  Bounds within which to crop the data. Only in effect for ept format. Must be of the form: ([xmin, xmax], [ymin, ymax], [zmin, zmax])
    :param thin_radius: float, optional, The radius for thinning the point cloud. Must be a positive number.
    :param hag: bool, optional, If True, calculate Height Above Ground (HAG) using Delaunay triangulation.
    :param hag_dtm: bool, optional, If True, calculate Height Above Ground (HAG) using a DTM file.
    :param dtm: str, optional, The path to the DTM file used when hag_dtm is True. Must be a .tif file.
    :param crop_poly: bool, optional, If True, crop the point cloud using the polygon defined in the poly file.
    :param poly: str, optional, The path to the polygon file used for cropping OR the WKT of the Polygon geometry.

    :return: numpy.ndarray, The processed point cloud data or None if no data is retrieved.

    :raises FileNotFoundError: If the input file, polygon file, or DTM file does not exist.
    :raises ValueError: If the input file extension is unsupported, thinning radius is non-positive, or
                        both 'hag' and 'hag_dtm' are True simultaneously, or the DTM file path is not
                        provided when 'hag_dtm' is True.
    """
    if not _is_url(input_file) and not os.path.isfile(input_file):
        raise FileNotFoundError(f"No such file: '{input_file}'")

    las_extensions = ('.las', '.laz')
    copc_extensions = ('.copc', '.copc.laz')
    ept_file = ('ept.json')

    file_lower = input_file.lower()
    if file_lower.endswith(las_extensions):
        reader = 'readers.las'
    elif file_lower.endswith(copc_extensions):
        reader = 'readers.copc'
    elif file_lower.endswith(ept_file):
        reader = 'readers.ept'
    else:
        raise ValueError(
            "The input file must be a .las, .laz, .copc, .copc.laz file, or an ept.json file."
        )

    if hag and hag_dtm:
        raise ValueError("Cannot use both 'hag' and 'hag_dtm' options at the same time.")

    pipeline_stages = []
    crs_list = []

    if crop_poly:
        if not poly:
            raise ValueError(f"Must provide a polygon or polygon wkt if cropping to a polygon.")
        if poly.strip().startswith(('POLYGON', 'MULTIPOLYGON')):
            polygon_wkt = poly
        else:
            if not poly or not os.path.isfile(poly):
                raise FileNotFoundError(f"No such polygon file: '{poly}'")
            polygon_wkt, crs_vector = load_polygon_from_file(poly)
        pipeline_stages.append(_crop_polygon(polygon_wkt))

    if thin_radius is not None:
        if thin_radius <= 0:
            raise ValueError("Thinning radius must be a positive number.")
        pipeline_stages.append(_filter_radius(thin_radius))

    if hag:
        pipeline_stages.append(_hag_delaunay())

    if hag_dtm:
        if not dtm:
            raise ValueError("DTM file path must be provided when 'hag_dtm' is True.")
        if not os.path.isfile(dtm):
            raise FileNotFoundError(f"No such DTM file: '{dtm}'")
        if not dtm.lower().endswith('.tif'):
            raise ValueError("The DTM file must be a .tif file.")
        crs_raster = get_raster_epsg(dtm)
        crs_list.append(crs_raster)
        pipeline_stages.append(_hag_raster(dtm))

    base_pipeline = {
        "type": reader,
        "spatialreference": srs,
        "filename": input_file
    }
    if bounds and reader == 'readers.ept':
        base_pipeline["bounds"] = f"{bounds}"
    main_pipeline_json = {
        "pipeline": [
            base_pipeline
        ] + pipeline_stages
    }

    main_pipeline = pdal.Pipeline(json.dumps(main_pipeline_json))
    main_pipeline.execute()

    point_cloud = main_pipeline.arrays
    return point_cloud if point_cloud else None

simplify_crs(crs_list)

Converts a list of CRS representations to their corresponding EPSG codes.

:param crs_list: List of CRS definitions to be simplified. :type crs_list: list :return: List of EPSG codes corresponding to the input CRS definitions. :rtype: list :raises CRSError: If any of the CRS definitions cannot be converted to an EPSG code.

Source code in pyforestscan/handlers.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def simplify_crs(crs_list):
    """
    Converts a list of CRS representations to their corresponding EPSG codes.

    :param crs_list: List of CRS definitions to be simplified.
    :type crs_list: list
    :return: List of EPSG codes corresponding to the input CRS definitions.
    :rtype: list
    :raises CRSError: If any of the CRS definitions cannot be converted to an EPSG code.
    """
    epsg_codes = []
    for crs in crs_list:
        try:
            crs_obj = CRS(crs)
            epsg = crs_obj.to_epsg()
            if epsg is None:
                raise CRSError(f"Cannot convert CRS '{crs}' to an EPSG code.")
            epsg_codes.append(epsg)
        except CRSError as e:
            raise CRSError(f"Error converting CRS '{crs}': {e}") from None
    return epsg_codes

validate_crs(crs_list)

Validate that all CRS representations in the list are identical.

:param crs_list: List of coordinate reference systems to validate. :type crs_list: list :return: True if all CRSs match. :rtype: bool :raises ValueError: If the CRSs do not match.

Source code in pyforestscan/handlers.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def validate_crs(crs_list):
    """
    Validate that all CRS representations in the list are identical.

    :param crs_list: List of coordinate reference systems to validate.
    :type crs_list: list
    :return: True if all CRSs match.
    :rtype: bool
    :raises ValueError: If the CRSs do not match.
    """
    simplified_crs_list = simplify_crs(crs_list)
    if not all(crs == simplified_crs_list[0] for crs in simplified_crs_list[1:]):
        raise ValueError("The CRS of the inputs do not match.")
    return True

validate_extensions(las_file_path, dtm_file_path)

Validates the extensions of the provided file paths to check if they match the required .las/.laz for point cloud files and .tif for DTM files.

:param las_file_path: The file path of the point cloud file. Supported extensions are .las and .laz. :type las_file_path: str :param dtm_file_path: The file path of the DTM (Digital Terrain Model) file. Supported extension is .tif. :type dtm_file_path: str :raises ValueError: If the point cloud file does not have a .las or .laz extension. :raises ValueError: If the DTM file does not have a .tif extension.

Source code in pyforestscan/handlers.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def validate_extensions(las_file_path, dtm_file_path):
    """
    Validates the extensions of the provided file paths to check if they match
    the required .las/.laz for point cloud files and .tif for DTM files.

    :param las_file_path: The file path of the point cloud file.
                            Supported extensions are .las and .laz.
    :type las_file_path: str
    :param dtm_file_path: The file path of the DTM (Digital Terrain Model) file.
                            Supported extension is .tif.
    :type dtm_file_path: str
    :raises ValueError: If the point cloud file does not have a .las or .laz extension.
    :raises ValueError: If the DTM file does not have a .tif extension.
    """
    if not las_file_path.lower().endswith(('.las', '.laz')):
        raise ValueError("The point cloud file must be a .las or .laz file.")
    if not dtm_file_path.lower().endswith('.tif'):
        raise ValueError("The DTM file must be a .tif file.")

write_las(arrays, output_file, srs=None, compress=True)

Writes point cloud data to a LAS or LAZ file.

:param arrays: The point cloud data arrays. :param output_file: The path of the output file. :param srs: Optional; Spatial Reference System to reproject the data. :param compress: Optional; Boolean flag to compress the output. Defaults to True. :raises ValueError: If 'compress' is True and output file extension is not .laz. :raises ValueError: If 'compress' is False and output file extension is not .las.

:return: None

Source code in pyforestscan/handlers.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def write_las(arrays, output_file, srs=None, compress=True):
    """
    Writes point cloud data to a LAS or LAZ file.

    :param arrays: The point cloud data arrays.
    :param output_file: The path of the output file.
    :param srs: Optional; Spatial Reference System to reproject the data.
    :param compress: Optional; Boolean flag to compress the output. Defaults to True.
    :raises ValueError: If 'compress' is True and output file extension is not .laz.
    :raises ValueError: If 'compress' is False and output file extension is not .las.

    :return: None
    """
    output_extension = os.path.splitext(output_file)[1].lower()

    if compress:
        if output_extension != '.laz':
            raise ValueError("If 'compress' is True, output file must have a .laz extension.")
        output_format = "writers.las"
    else:
        if output_extension != '.las':
            raise ValueError("If 'compress' is False, output file must have a .las extension.")
        output_format = "writers.las"

    pipeline_steps = []

    if srs:
        pipeline_steps.append({
            "type": "filters.reprojection",
            "in_srs": srs,
            "out_srs": srs
        })

    pipeline_steps.append({
        "type": output_format,
        "filename": output_file,
        "minor_version": "4",
        "extra_dims": "all"
    })

    pipeline_def = {
        "pipeline": pipeline_steps
    }

    pipeline_json = json.dumps(pipeline_def)

    if not isinstance(arrays, list):
        arrays = [arrays]

    pipeline = pdal.Pipeline(pipeline_json, arrays=arrays)
    pipeline.execute()