'''
Pointcloud functions for reading/writing LAS files, and functions for dealing
with the spatial reference system.
'''
from __future__ import print_function
import liblas
import pcl
import os
import numpy as np
import time
from patty.srs import force_srs, is_registered
from sklearn.decomposition import PCA
def _check_readable(filepath):
"""
Test whether filepath is readable, raises IOError otherwise
"""
with open(filepath):
pass
def _check_writable(filepath):
"""
Test whether filepath is writable, raises IOError otherwise
"""
# either the path exists but is not writable, or the path does not exist
# and the parent is not writable.
filepath = os.path.abspath(filepath)
if (os.path.exists(filepath) and (
not os.path.isfile(filepath) or
not os.access(filepath, os.W_OK)
)) or not os.access(os.path.dirname(filepath), os.W_OK | os.X_OK):
raise IOError("Cannot save to " + filepath)
[docs]def clone(pc):
"""
Return a copy of a pointcloud, including registration metadata
Arguments:
pc: pcl.PointCloud()
Returns:
cp: pcl.PointCloud()
"""
cp = pcl.PointCloud(np.asarray(pc))
if is_registered(pc):
force_srs(cp, same_as=pc)
return cp
[docs]def load(path, format=None, load_rgb=True):
"""
Read a pointcloud file.
Supports LAS and CSV files, and lets PCD and PLY files be
read by python-pcl.
Arguments:
path : string
Filename.
format : string, optional
File format: "PLY", "PCD", "LAS", "CSV",
or None to detect the format from the file extension.
load_rgb : bool
Whether RGB is loaded for PLY and PCD files. For LAS files, RGB is
always read.
Returns:
pc : pcl.PointCloud
"""
if format == 'las' or format is None and path.endswith('.las'):
pc = _load_las(path)
elif format == 'las' or format is None and path.endswith('.csv'):
pc = _load_csv(path)
else:
_check_readable(path)
pc = pcl.load(path, format=format, loadRGB=load_rgb)
return pc
[docs]def save(cloud, path, format=None, binary=False, las_header=None):
"""Save a pointcloud to file.
Supports LAS and CSV files, and lets PCD and PLY
files be saved by python-pcl.
Arguments:
cloud : pcl.PointCloud or pcl.PointCloudXYZRGB
Pointcloud to save.
path : string
Filename.
format : string
File format: "PLY", "PCD", "LAS", "CSV",
or None to detect the format from the file extension.
binary : boolean
Whether PLY and PCD files are saved in binary format.
las_header: liblas.header.Header
LAS header to use. When none, a default header is created by
make_las_header(). Default: None
"""
if format == 'las' or format is None and path.endswith('.las'):
_save_las(path, cloud, header=las_header)
elif format == 'csv' or format is None and path.endswith('.csv'):
_save_csv(path, cloud)
else:
_check_writable(path)
if is_registered(cloud) and cloud.offset != np.zeros(3):
cloud_array = np.asarray(cloud)
cloud_array += cloud.offset
pcl.save(cloud, path, format=format, binary=binary)
def _load_las(lasfile):
"""Read a LAS file
Returns:
registered pointcloudxyzrgb
The pointcloud has color and XYZ coordinates, and the offset and precision
set.
"""
_check_readable(lasfile)
las = None
try:
las = liblas.file.File(lasfile)
lsrs = las.header.get_srs()
lsrs = lsrs.get_wkt()
n_points = las.header.get_count()
precise_points = np.zeros((n_points, 6), dtype=np.float64)
for i, point in enumerate(las):
precise_points[i] = (point.x, point.y, point.z,
point.color.red / 256,
point.color.green / 256,
point.color.blue / 256)
# reduce the offset to decrease floating point errors
bbox = BoundingBox(points=precise_points[:, 0:3])
center = bbox.center
precise_points[:, 0:3] -= center
pointcloud = pcl.PointCloudXYZRGB(precise_points.astype(np.float32))
force_srs(pointcloud, srs=lsrs, offset=center)
finally:
if las is not None:
las.close()
return pointcloud
def _load_csv(path, delimiter=','):
"""
Load a set of points from a CSV file as a pointcloud
Returns:
pc : pcl.PointCloud
"""
precise_points = np.genfromtxt(path, delimiter=delimiter, dtype=np.float64)
offset = np.mean(precise_points, axis=0, dtype=np.float64)
pc = pcl.PointCloud(np.array(precise_points - offset, dtype=np.float32))
force_srs(pc, offset=offset)
return pc
def _save_csv(path, pc, delimiter=', '):
"""
Write a pointcloud to a CSV file.
Arguments:
path: string
Output filename
pc: pcl.PointCloud
Pointcloud to save
delimiter: string
Field delimiter to use, see np.savetxt documentation.
"""
if not hasattr(pc, 'offset'):
offset = np.zeros(3)
else:
offset = pc.offset
np.savetxt(path, np.asarray(pc) + offset, delimiter=delimiter)
def _save_las(lasfile, pointcloud, header=None):
"""Write a pointcloud to a LAS file
Arguments:
lasfile : string
Filename.
pointcloud : pcl.PointCloud
header : liblas.header.Header, optional
See :func:`make_las_header`. If not given, makes a header using
that function with default settings.
"""
_check_writable(lasfile)
if header is None:
header = make_las_header(pointcloud)
# deal with color
if len(pointcloud[0]) > 3:
do_rgb = True
else:
do_rgb = False
precise_points = np.array(pointcloud, dtype=np.float64)
precise_points /= header.scale
las = None
try:
las = liblas.file.File(lasfile, mode="w", header=header)
for i in xrange(pointcloud.size):
point = liblas.point.Point()
point.x, point.y, point.z = precise_points[i]
if do_rgb:
red, grn, blu = pointcloud[i][3:6]
point.color = liblas.color.Color(
red=int(red) * 256,
green=int(grn) * 256,
blue=int(blu) * 256)
las.write(point)
finally:
if las is not None:
las.close()
[docs]class BoundingBox(object):
'''A bounding box for a sequence of points.
Center, size and diagonal are updated when the minimum or maximum are
updated.
Constructor usage: either set points (any object that is converted to an
NxD array by np.asarray, with D the number of dimensions) or a fixed min
and max.
'''
def __init__(self, points=None, min=None, max=None):
if min is not None and max is not None:
self._min = np.asarray(min, dtype=np.float64)
self._max = np.asarray(max, dtype=np.float64)
elif points is not None:
points_array = np.asarray(points)
self._min = points_array.min(axis=0)
self._max = points_array.max(axis=0)
else:
raise TypeError("Need to give min and max or matrix")
self._reset()
def __str__(self):
return 'BoundingBox <%s - %s>' % (self.min, self.max)
def _reset(self):
self._center = None
self._size = None
@property
def min(self):
return self._min
@min.setter
def min(self, new_min):
self._reset()
self.min = new_min
@property
def max(self):
return self._max
@max.setter
def max(self, new_max):
self._reset()
self.max = new_max
@property
def center(self):
''' Center point of the bounding box'''
if self._center is None:
self._center = (self.min + self.max) / 2.0
return self._center
@property
def size(self):
''' N-dimensional size array '''
if self._size is None:
self._size = self.max - self.min
return self._size
@property
def diagonal(self):
''' Length of the diagonal of the box. '''
return np.linalg.norm(self.size)
[docs] def contains(self, pos):
''' Whether the bounding box contains given position. '''
return np.all((pos[0:3] >= self.min) & (pos[0:3] <= self.max))
[docs]def log(*args, **kwargs):
"""Simple logging function that prints to stdout"""
print(time.strftime("[%F %H:%M:%S]", time.gmtime()), *args, **kwargs)
[docs]def measure_length(pointcloud):
"""Returns the length of a point cloud in its longest direction."""
if len(pointcloud) < 2:
return 0
pca = PCA(n_components=1)
pc_array = np.asarray(pointcloud)
pca.fit(pc_array)
primary_axis = np.dot(pc_array, np.transpose(pca.components_))[:, 0]
return np.max(primary_axis) - np.min(primary_axis)
[docs]def downsample_voxel(pc, voxel_size=0.01):
'''Downsample a pointcloud using a voxel grid filter.
Resulting pointcloud has the same SRS and offset as the input.
Arguments:
pc : pcl.PointCloud
Original pointcloud
float : voxel_size
Grid spacing for the voxel grid
Returns:
pc : pcl.PointCloud
filtered pointcloud
'''
pc_filter = pc.make_voxel_grid_filter()
pc_filter.set_leaf_size(voxel_size, voxel_size, voxel_size)
newpc = pc_filter.filter()
force_srs(newpc, same_as=pc)
return newpc
[docs]def downsample_random(pc, fraction, random_seed=None):
"""Randomly downsample pointcloud to a fraction of its size.
Returns a pointcloud of size fraction * len(pc), rounded to the nearest
integer. Resulting pointcloud has the same SRS and offset as the input.
Use random_seed=k for some integer k to get reproducible results.
Arguments:
pc : pcl.PointCloud
Input pointcloud.
fraction : float
Fraction of points to include.
random_seed : int, optional
Seed to use in random number generator.
Returns:
pcl.Pointcloud
"""
if not 0 < fraction <= 1:
raise ValueError("Expected fraction in (0,1], got %r" % fraction)
rng = np.random.RandomState(random_seed)
k = max(int(round(fraction * len(pc))), 1)
sample = rng.choice(len(pc), k, replace=False)
new_pc = pc.extract(sample)
force_srs(new_pc, same_as=pc)
return new_pc