Source code for astropy.coordinates.matching

# Licensed under a 3-clause BSD style license - see LICENSE.rst

"""
This module contains functions for matching coordinate catalogs.
"""

from typing import NamedTuple

import numpy as np
from numpy.typing import NDArray

from astropy.units import Quantity

from . import Angle
from .representation import UnitSphericalRepresentation
from .sky_coordinate import SkyCoord

__all__ = [
    "CoordinateMatchResult",
    "CoordinateSearchResult",
    "match_coordinates_3d",
    "match_coordinates_sky",
    "search_around_3d",
    "search_around_sky",
]


[docs] class CoordinateMatchResult(NamedTuple): """Results of matching a set of sources to a catalog. See Also -------- astropy.coordinates.match_coordinates_3d astropy.coordinates.match_coordinates_sky SkyCoord.match_to_catalog_3d SkyCoord.match_to_catalog_sky """ indices_to_catalog: NDArray[np.int32] | NDArray[np.int64] """For each source the index of the match in the catalog.""" angular_separation: Angle """The angular separation between each source and its match in the catalog.""" physical_separation: Quantity """The physical separation between each source and its match in the catalog. If the sources or the catalog lack distances then the physical separations are computed assuming all coordinates are points on the unit sphere."""
[docs] def match_coordinates_3d( matchcoord, catalogcoord, nthneighbor=1, storekdtree="kdtree_3d" ): """ Finds the nearest 3-dimensional matches of a coordinate or coordinates in a set of catalog coordinates. This finds the 3-dimensional closest neighbor, which is only different from the on-sky distance if ``distance`` is set in either ``matchcoord`` or ``catalogcoord``. Parameters ---------- matchcoord : `~astropy.coordinates.BaseCoordinateFrame` or `~astropy.coordinates.SkyCoord` The coordinate(s) to match to the catalog. catalogcoord : `~astropy.coordinates.BaseCoordinateFrame` or `~astropy.coordinates.SkyCoord` The base catalog in which to search for matches. Typically this will be a coordinate object that is an array (i.e., ``catalogcoord.isscalar == False``) nthneighbor : int, optional Which closest neighbor to search for. Typically ``1`` is desired here, as that is correct for matching one set of coordinates to another. The next likely use case is ``2``, for matching a coordinate catalog against *itself* (``1`` is inappropriate because each point will find itself as the closest match). storekdtree : bool or str, optional If a string, will store the KD-Tree used for the computation in the ``catalogcoord``, as in ``catalogcoord.cache`` with the provided name. This dramatically speeds up subsequent calls with the same catalog. If False, the KD-Tree is discarded after use. Returns ------- CoordinateMatchResult A `~typing.NamedTuple` with attributes representing for each source in ``matchcoord`` the indices and angular and physical separations of the match in ``catalogcoord``. Notes ----- This function requires `SciPy <https://www.scipy.org/>`_ to be installed or it will fail. """ if catalogcoord.isscalar or len(catalogcoord) < 1: raise ValueError( "The catalog for coordinate matching cannot be a scalar or length-0." ) kdt = _get_cartesian_kdtree(catalogcoord, storekdtree) # make sure coordinate systems match if isinstance(matchcoord, SkyCoord): matchcoord = matchcoord.transform_to(catalogcoord, merge_attributes=False) else: matchcoord = matchcoord.transform_to(catalogcoord) # make sure units match catunit = catalogcoord.cartesian.x.unit matchxyz = matchcoord.cartesian.xyz.to(catunit) matchflatxyz = matchxyz.reshape((3, np.prod(matchxyz.shape) // 3)) # Querying NaN returns garbage if np.isnan(matchflatxyz.value).any(): raise ValueError("Matching coordinates cannot contain NaN entries.") dist, idx = kdt.query(matchflatxyz.T, nthneighbor) if nthneighbor > 1: # query gives 1D arrays if k=1, 2D arrays otherwise dist = dist[:, -1] idx = idx[:, -1] sep2d = catalogcoord[idx].separation(matchcoord) return CoordinateMatchResult( idx.reshape(matchxyz.shape[1:]), sep2d, dist.reshape(matchxyz.shape[1:]) * catunit, )
[docs] def match_coordinates_sky( matchcoord, catalogcoord, nthneighbor=1, storekdtree="kdtree_sky" ): """ Finds the nearest on-sky matches of a coordinate or coordinates in a set of catalog coordinates. This finds the on-sky closest neighbor, which is only different from the 3-dimensional match if ``distance`` is set in either ``matchcoord`` or ``catalogcoord``. Parameters ---------- matchcoord : `~astropy.coordinates.BaseCoordinateFrame` or `~astropy.coordinates.SkyCoord` The coordinate(s) to match to the catalog. catalogcoord : `~astropy.coordinates.BaseCoordinateFrame` or `~astropy.coordinates.SkyCoord` The base catalog in which to search for matches. Typically this will be a coordinate object that is an array (i.e., ``catalogcoord.isscalar == False``) nthneighbor : int, optional Which closest neighbor to search for. Typically ``1`` is desired here, as that is correct for matching one set of coordinates to another. The next likely use case is ``2``, for matching a coordinate catalog against *itself* (``1`` is inappropriate because each point will find itself as the closest match). storekdtree : bool or str, optional If a string, will store the KD-Tree used for the computation in the ``catalogcoord`` in ``catalogcoord.cache`` with the provided name. This dramatically speeds up subsequent calls with the same catalog. If False, the KD-Tree is discarded after use. Returns ------- CoordinateMatchResult A `~typing.NamedTuple` with attributes representing for each source in ``matchcoord`` the indices and angular and physical separations of the match in ``catalogcoord``. If either ``matchcoord`` or ``catalogcoord`` lack distances, the physical separation is the 3D distance on the unit sphere, rather than a true distance. Notes ----- This function requires `SciPy <https://www.scipy.org/>`_ to be installed or it will fail. """ if catalogcoord.isscalar or len(catalogcoord) < 1: raise ValueError( "The catalog for coordinate matching cannot be a scalar or length-0." ) # send to catalog frame if isinstance(matchcoord, SkyCoord): newmatch = matchcoord.transform_to(catalogcoord, merge_attributes=False) else: newmatch = matchcoord.transform_to(catalogcoord) # strip out distance info match_urepr = newmatch.data.represent_as(UnitSphericalRepresentation) newmatch_u = newmatch.realize_frame(match_urepr) cat_urepr = catalogcoord.data.represent_as(UnitSphericalRepresentation) newcat_u = catalogcoord.realize_frame(cat_urepr) # Check for a stored KD-tree on the passed-in coordinate. Normally it will # have a distinct name from the "3D" one, so it's safe to use even though # it's based on UnitSphericalRepresentation. storekdtree = catalogcoord.cache.get(storekdtree, storekdtree) idx, sep2d, sep3d = match_coordinates_3d( newmatch_u, newcat_u, nthneighbor, storekdtree ) # sep3d is *wrong* above, because the distance information was removed, # unless one of the catalogs doesn't have a real distance if not ( isinstance(catalogcoord.data, UnitSphericalRepresentation) or isinstance(newmatch.data, UnitSphericalRepresentation) ): sep3d = catalogcoord[idx].separation_3d(newmatch) # update the kdtree on the actual passed-in coordinate if isinstance(storekdtree, str): catalogcoord.cache[storekdtree] = newcat_u.cache[storekdtree] elif storekdtree is True: # the old backwards-compatible name catalogcoord.cache["kdtree"] = newcat_u.cache["kdtree"] return CoordinateMatchResult(idx, sep2d, sep3d)
[docs] class CoordinateSearchResult(NamedTuple): """Results of searching close pairs between two sets of sources. See Also -------- astropy.coordinates.search_around_3d astropy.coordinates.search_around_sky SkyCoord.search_around_3d SkyCoord.search_around_sky """ indices_to_first_set: NDArray[np.int32] | NDArray[np.int64] """Indices of the elements of the found pairs in the first set of sources.""" indices_to_second_set: NDArray[np.int32] | NDArray[np.int64] """Indices of the elements of the found pairs in the second set of sources.""" angular_separation: Angle """The angular separations between the paired sources.""" physical_separation: Quantity """The physical separations between the paired sources. If either of the source sets lack distances then the physical separations are computed assuming all coordinates are points on the unit sphere."""
[docs] def search_around_3d(coords1, coords2, distlimit, storekdtree="kdtree_3d"): """ Searches for pairs of points that are at least as close as a specified distance in 3D space. This is intended for use on coordinate objects with arrays of coordinates, not scalars. For scalar coordinates, it is better to use the ``separation_3d`` methods. Parameters ---------- coords1 : `~astropy.coordinates.BaseCoordinateFrame` or `~astropy.coordinates.SkyCoord` The first set of coordinates, which will be searched for matches from ``coords2`` within ``seplimit``. Must be a one-dimensional coordinate array. coords2 : `~astropy.coordinates.BaseCoordinateFrame` or `~astropy.coordinates.SkyCoord` The second set of coordinates, which will be searched for matches from ``coords1`` within ``seplimit``. Must be a one-dimensional coordinate array. distlimit : `~astropy.units.Quantity` ['length'] The physical radius to search within. It should be broadcastable to the same shape as ``coords1``. storekdtree : bool or str, optional If a string, will store the KD-Tree used in the search with the name ``storekdtree`` in ``coords2.cache``. This speeds up subsequent calls to this function. If False, the KD-Trees are not saved. Returns ------- CoordinateSearchResult A `~typing.NamedTuple` with attributes representing the indices of the elements of found pairs in both source sets and angular and physical separations of the pairs. Notes ----- This function requires `SciPy <https://www.scipy.org/>`_ to be installed or it will fail. If you are using this function to search in a catalog for matches around specific points, the convention is for ``coords2`` to be the catalog, and ``coords1`` are the points to search around. While these operations are mathematically the same if ``coords1`` and ``coords2`` are flipped, some of the optimizations may work better if this convention is obeyed. In the current implementation, the return values are always sorted in the same order as the ``coords1`` (so ``idx1`` is in ascending order). This is considered an implementation detail, though, so it could change in a future release. """ if coords1.ndim != 1 or coords2.ndim != 1: msg = "search_around_3d only supports 1-dimensional coordinate arrays." if coords1.isscalar or coords2.isscalar: msg += " With a scalar array, use ``coord1.separation(coord2) < seplimit``." raise ValueError(msg) kdt2 = _get_cartesian_kdtree(coords2, storekdtree) cunit = coords2.cartesian.x.unit # we convert coord1 to match coord2's frame. We do it this way # so that if the conversion does happen, the KD tree of coord2 at least gets # saved. (by convention, coord2 is the "catalog" if that makes sense) coords1 = coords1.transform_to(coords2) kdt1 = _get_cartesian_kdtree(coords1, storekdtree, forceunit=cunit) idxs1 = [] idxs2 = [] if distlimit.isscalar: for i, matches in enumerate( kdt1.query_ball_tree(kdt2, distlimit.to_value(cunit)) ): idxs1.extend(len(matches) * [i]) idxs2.extend(matches) else: for i, (point, distance) in enumerate(zip(kdt1.data, distlimit, strict=True)): matches = kdt2.query_ball_point(point, distance.to_value(cunit)) idxs1.extend(len(matches) * [i]) idxs2.extend(matches) return CoordinateSearchResult( np.array(idxs1, dtype=int), np.array(idxs2, dtype=int), coords1[idxs1].separation(coords2[idxs2]), coords1[idxs1].separation_3d(coords2[idxs2]), )
[docs] def search_around_sky(coords1, coords2, seplimit, storekdtree="kdtree_sky"): """ Searches for pairs of points that have an angular separation at least as close as a specified angle. This is intended for use on coordinate objects with arrays of coordinates, not scalars. For scalar coordinates, it is better to use the ``separation`` methods. Parameters ---------- coords1 : coordinate-like The first set of coordinates, which will be searched for matches from ``coords2`` within ``seplimit``. Must be a one-dimensional coordinate array. coords2 : coordinate-like The second set of coordinates, which will be searched for matches from ``coords1`` within ``seplimit``. Must be a one-dimensional coordinate array. seplimit : `~astropy.units.Quantity` ['angle'] The on-sky separation to search within. It should be broadcastable to the same shape as ``coords1``. storekdtree : bool or str, optional If a string, will store the KD-Tree used in the search with the name ``storekdtree`` in ``coords2.cache``. This speeds up subsequent calls to this function. If False, the KD-Trees are not saved. Returns ------- CoordinateSearchResult A `~typing.NamedTuple` with attributes representing the indices of the elements of found pairs in both source sets and angular and physical separations of the pairs. If either set of sources lack distances, the physical separation is the 3D distance on the unit sphere, rather than a true distance. Notes ----- This function requires `SciPy <https://www.scipy.org/>`_ to be installed or it will fail. In the current implementation, the return values are always sorted in the same order as the ``coords1`` (so ``idx1`` is in ascending order). This is considered an implementation detail, though, so it could change in a future release. """ if coords1.ndim != 1 or coords2.ndim != 1: msg = "search_around_sky only supports 1-dimensional coordinate arrays." if coords1.isscalar or coords2.isscalar: msg += " With a scalar array, use ``coord1.separation(coord2) < seplimit``." raise ValueError(msg) # we convert coord1 to match coord2's frame. We do it this way # so that if the conversion does happen, the KD tree of coord2 at least gets # saved. (by convention, coord2 is the "catalog" if that makes sense) coords1 = coords1.transform_to(coords2) # strip out distance info urepr1 = coords1.data.represent_as(UnitSphericalRepresentation) kdt1 = _get_cartesian_kdtree(coords1.realize_frame(urepr1), storekdtree) if storekdtree and coords2.cache.get(storekdtree): # just use the stored KD-Tree kdt2 = coords2.cache[storekdtree] else: # strip out distance info urepr2 = coords2.data.represent_as(UnitSphericalRepresentation) kdt2 = _get_cartesian_kdtree(coords2.realize_frame(urepr2), storekdtree) if storekdtree: coords2.cache["kdtree" if storekdtree is True else storekdtree] = kdt2 idxs1 = [] idxs2 = [] if seplimit.isscalar: # this is the *cartesian* 3D distance that corresponds to the given angle r = (2 * np.sin(Angle(0.5 * seplimit))).value for i, matches in enumerate(kdt1.query_ball_tree(kdt2, r)): idxs1.extend(len(matches) * [i]) idxs2.extend(matches) else: for i, (point, sep) in enumerate(zip(kdt1.data, seplimit, strict=True)): radius = (2 * np.sin(Angle(0.5 * sep))).value matches = kdt2.query_ball_point(point, radius) idxs1.extend(len(matches) * [i]) idxs2.extend(matches) d2ds = coords1[idxs1].separation(coords2[idxs2]) try: d3ds = coords1[idxs1].separation_3d(coords2[idxs2]) except ValueError: # they don't have distances, so we just fall back on the cartesian # distance, computed from d2ds d3ds = 2 * np.sin(0.5 * d2ds) return CoordinateSearchResult( np.array(idxs1, dtype=int), np.array(idxs2, dtype=int), d2ds, d3ds )
def _get_cartesian_kdtree(coord, attrname_or_kdt="kdtree", forceunit=None): """ This is a utility function to retrieve (and build/cache, if necessary) a 3D cartesian KD-Tree from various sorts of astropy coordinate objects. Parameters ---------- coord : `~astropy.coordinates.BaseCoordinateFrame` or `~astropy.coordinates.SkyCoord` The coordinates to build the KD-Tree for. attrname_or_kdt : bool or str or KDTree If a string, will store the KD-Tree used for the computation in the ``coord``, in ``coord.cache`` with the provided name. If given as a KD-Tree, it will just be used directly. forceunit : unit or None If a unit, the cartesian coordinates will convert to that unit before being put in the KD-Tree. If None, whatever unit it's already in will be used Returns ------- kdt : `~scipy.spatial.KDTree` The KD-Tree representing the 3D cartesian representation of the input coordinates. """ from scipy.spatial import KDTree if attrname_or_kdt is True: # backwards compatibility for pre v0.4 attrname_or_kdt = "kdtree" # figure out where any cached KDTree might be if isinstance(attrname_or_kdt, str): kdt = coord.cache.get(attrname_or_kdt, None) if kdt is not None and not isinstance(kdt, KDTree): raise TypeError( f'The `attrname_or_kdt` "{attrname_or_kdt}" is not a scipy KD tree!' ) elif isinstance(attrname_or_kdt, KDTree): kdt = attrname_or_kdt attrname_or_kdt = None elif not attrname_or_kdt: kdt = None else: raise TypeError( "Invalid `attrname_or_kdt` argument for KD-Tree:" + str(attrname_or_kdt) ) if kdt is None: # need to build the cartesian KD-tree for the catalog if forceunit is None: cartxyz = coord.cartesian.xyz else: cartxyz = coord.cartesian.xyz.to(forceunit) flatxyz = cartxyz.reshape((3, np.prod(cartxyz.shape) // 3)) # There should be no NaNs in the kdtree data. if np.isnan(flatxyz.value).any(): raise ValueError("Catalog coordinates cannot contain NaN entries.") # Not obvious if compact_nodes=False, balanced_tree=False is still needed but # we stay backwards-compatible with previous versions of `astropy` for now. kdt = KDTree(flatxyz.value.T, compact_nodes=False, balanced_tree=False) if attrname_or_kdt: # cache the kdtree in `coord` coord.cache[attrname_or_kdt] = kdt return kdt