```1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 ``` ```(* A Vantage Point Tree is a Binary Space Partitioning structure which allows * for fast nearest neighbour searches in a general metric space. In a VPTree * we select a distinguished point and call it the vantage point. The remaining * points may then be divided into two groups. Those which are within distance * d of the vantage point and those which are not. These two groups are referred * to as the inner and outer points respectively. This subdivision is performed * recursively on both the inner and outer points to form a tree. * * When searching a VPTree we use distance from the vantage point to cull the * search space. Assume we wish to find the nearest neighbour of a point q * and let the closest point to q found so far be denoted by c. If q is far * enough away from the vantage point and c is sufficiently close to q then it * is unnecessary to search the inner points as all such points will be further * away from q then c. Similarly, if q is close enough to the vantage point and * c is close enough to q it is unnecessary to search the outer points. * * Because VPTree's do not rely on the representation of the points stored and * require only the ability to compute distances between points they are well * suited for storage of points in non-euclidean spaces. For example, a VPTree * may be used to store strings using the edit-distance in a spell checking * application. * * For more details about VPTree's see: * http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.41.4193 *) (* The representation of VPTree's used is an optimization of the idea outlined * above. As in the above idea we split points into two groups. Those which * are within distance d of the vantage point and those which are further * then d from the vantage point. In contrast to the above description, * however, we don't store d but instead store the distances to the closest * and furthest point from the vantage point in both the inner and outer * groups. This information allows for more aggressive pruning of the search * space during nearest neighbour queries. *) type VPTree< 'a , 'b > = (*private*) Node of 'a * BoundedTree< 'a , 'b > * BoundedTree< 'a , 'b > | Pair of 'a * 'a | Leaf of 'a and (*private*) BoundedTree< 'a , 'b > = { tree : VPTree< 'a , 'b > ; innerRadius : 'b ; outerRadius : 'b } (* Before proceeding with the implementation of the VPTree we some additional * functions are required. These are a partition function and a selection * function. Due to the size of the datasets involved we cannot afford to copy * the input arrays. As a result both functions perform their work inplace and * act on a subset of the input array, [i, j]. * * In addition to partitioning the array the partition function tracks the * smallest and largest value in each partition and ensures that this value * is at the start and end of the corresponding partition. This is necessary * to minimize the number of distance computations performed by the VPTree * implementation as distance computations are usually expensive. * * The selection algorithm used has expected linear time and is a modified * version of the partition based algorithm presented at * http://en.wikipedia.org/wiki/Selection_algorithm *) module Utilities = let rec partition (f : 'a -> 'b) (mu : 'b) (xs : 'a []) (i : int) (j : int) = let swap (i : int) (j : int) = let x = xs.[j] in xs.[j] <- xs.[i] ; xs.[i] <- x let setLeftMin (k : int) = swap i (k - 1) ; swap k i let setLeftMax (k : int) = () let setRightMin (l : int) = () let setRightMax (l : int) = swap j (l + 1) ; swap l j let rec left (min : 'b) (max : 'b) (k : int) = if j < k then k else let x = f xs.[k] if x < mu then if x < min then setLeftMin k ; left x max (k + 1) else if max < x then setLeftMax k ; left min x (k + 1) else swap k (k - 1) ; left min max (k + 1) else swap k j ; partition min max x x k (j - 1) and right (min : 'b) (max : 'b) (l : int) = if l < i then i else let x = f xs.[l] if not (x < mu) then if x < min then setRightMin l ; right x max (l - 1) else if max < x then setRightMax l ; right min x (l - 1) else swap l (l + 1) ; right min max (l - 1) else swap i l ; partition x x min max (i + 1) l and partition (lmin : 'b) (lmax : 'b) (rmin : 'b) (rmax : 'b) (k : int) (l : int) = if l < k then k else let x = f xs.[k] if x < mu then if x < lmin then setLeftMin k ; partition x lmax rmin rmax (k + 1) l else if lmax < x then setLeftMax k ; partition lmin x rmin rmax (k + 1) l else swap k (k - 1) ; partition lmin lmax rmin rmax (k + 1) l else if x < rmin then swap k l ; setRightMin l ; partition lmin lmax x rmax k (l - 1) else if rmax < x then swap k l ; setRightMax l ; partition lmin lmax rmin x k (l - 1) else swap k l ; swap l (l + 1) ; partition lmin lmax rmin rmax k (l - 1) if j < i then i else let x = f xs.[i] if x < mu then left x x (i + 1) else swap i j ; right x x (j - 1) let private selectPivot = let rng = new System.Random(int System.DateTime.Now.Ticks) in fun (i : int) (j : int) -> i + rng.Next(j - i + 1) let rec select (f : 'a -> 'b) (n : int) (xs : 'a []) (i : int) (j : int) : 'b = let k = partition f (f xs.[selectPivot i j]) xs i j if n = k || n = i || n = j || (k > i && n = k - 1) then f xs.[n] else if n < k then select f n xs (i + 1) (k - 1) else select f n xs (k + 1) (j - 1) module VPTree = open Utilities let inline distance (x : ^a) (y : ^a) : ^b = (^a : (static member Distance : ^a * ^a -> ^b)(x, y)) (* VPTree's are constructed using a recursive top down algorithm. First a * vantage point and distance d are selected and the points are divided * into inner and outer groups. Next VPTree's are constructed from these * two groups of points. Finally the resulting VPTree is constructed from * the vantage point and the two recursively constructed VPTree's. * * Two important factors determining the quality of the produced VPTree are * the selection of the vantage point and distance, d. In order to ensure a * balanced tree is produced we always select d to be the median of the * distances from the vantage point. This ensures that half of the points * will end up in the inner group and half in the outer group. * * Selection of the vantage point is more difficult. Ideally we wish to * select the vantage point such that the cost of searches in the resulting * VPTree is minimized. An often used heuristic for achieving this is to * select a vantage point such that the deviation from the median distance * is maximized. This helps to ensure that the inner and outer groups are * well separated. * * Unfortunately, it is not possible to try all vantage points to find the * best one as this would be too expensive, requiring O(n^2) time. To avoid * this problem we take a small random sample of points and let the vantage * point be the best point in this set. *) let inline private median (f : 'a -> 'b) (xs : 'a []) (i : int) (j : int) : 'b = select f (i + (j - i + 1) / 2) xs i j let inline private sndMoment (vp : 'a) (mu : 'b) (xs : 'a []) : float = let square x = x * x in Array.sumBy (fun x -> square (float (distance vp x - mu))) xs let private sample< 'a > = let rng = new System.Random(int System.DateTime.Now.Ticks) in fun (xs : 'a []) (i : int) (j : int) -> Seq.initInfinite (fun _ -> i + rng.Next(j - i + 1)) |> Seq.distinct |> Seq.take (j - i + 1) let rec inline private selectVantagePoint (xs : 'a []) (i : int) (j : int) : int = let candidates = max 10 (min ((j - i + 1) / 50) 100) let samples = max 20 (min ((j - i + 1) / 10) 200) let vantagePoints = sample xs i j |> Seq.truncate candidates |> Seq.map (fun vpi -> let vp = xs.[vpi] let is = Seq.truncate samples (sample xs i j) let ss = Array.ofSeq <| Seq.map (Array.get xs) is let mu = select (distance vp) (ss.Length / 2) ss 0 (ss.Length - 1) let sp = sndMoment vp mu ss (vpi, sp)) fst (Seq.maxBy snd vantagePoints) (**************************************************************************) let inline ofArray (xs : 'a []) : VPTree< 'a , 'b > = let rec ofArray (i : int) (j : int) = if j - i = 0 then Leaf(xs.[i]) else if j - i = 1 then Pair(xs.[i], xs.[j]) else let k = selectVantagePoint xs i j let vp = xs.[k] in xs.[k] <- xs.[i] ; xs.[i] <- vp (* If we select a really bad pivot and all elements of the * input array are the same distance from vp this will fail. * Fixing this isn't hard but it is annoying. As such it's * left as an exercise for the reader. *) let mu = median (distance vp) xs (i + 1) j let k = partition (distance vp) mu xs (i + 1) j let inner = { innerRadius = distance vp xs.[i + 1] ; outerRadius = distance vp xs.[k - 1] ; tree = ofArray (i + 1) (k - 1) } let outer = { innerRadius = distance vp xs.[k] ; outerRadius = distance vp xs.[j] ; tree = ofArray k j } Node(vp, inner, outer) ofArray 0 (xs.Length - 1) (**************************************************************************) (* Computation of the nearest neighbour to a given query point uses a * recursive branch and bound algorithm. Subtree's are eliminated from * the search by employing the triangle rule to determine cases when no * element of a given subtree can possibly be the closest point to the * query. *) (* v = vantage point * q = query point * c = closest so far * x = true closest * * d(q, x) <= d(q, c) * * d(v, q) <= d(v, x) + d(x, q) * d(v, q) <= d(v, x) + d(q, c) * d(v, q) - d(q, c) <= d(v, x) * * d(v, x) <= d(v, q) + d(q, x) * d(v, x) <= d(v, q) + d(q, c) * * iu < d(v, q) - d(q, c) <= d(v, x) --> not in inner * ou < d(v, q) - d(q, c) <= d(v, x) --> not in inner or outer * d(v, x) <= d(v, q) + d(q, c) < il --> not in inner or outer * d(v, x) <= d(v, q) + d(q, c) < ol --> not in outer *) let inline nearest (q : 'a) (vptree : VPTree< 'a , 'b >) = let rec nearest (dqc : 'b) (c : 'a) (vptree : VPTree< 'a , 'b >) : 'b * 'a = match vptree with Node(v, inner, outer) -> let dvq = distance v q let c = if dvq < dqc then v else c let dqc = min dvq dqc if outer.outerRadius < dvq - dqc then (dqc, c) else if dvq + dqc < inner.innerRadius then (dqc, c) else if inner.outerRadius < dvq - dqc then nearest dqc c outer.tree else if dvq + dqc < outer.innerRadius then nearest dqc c inner.tree else (* The search order for the two subtrees is determined * by a simple heuristic. *) if dvq + dvq < inner.outerRadius + outer.innerRadius then let (dqc, c) = nearest dqc c inner.tree let (dqc, c) = nearest dqc c outer.tree (dqc, c) else let (dqc, c) = nearest dqc c outer.tree let (dqc, c) = nearest dqc c inner.tree (dqc, c) | Pair(x, y) -> let dqx = distance q x let dqy = distance q y let c = if dqx < dqc then x else c let dqc = min dqx dqc let c = if dqy < dqc then y else c let dqc = min dqy dqc (dqc, c) | Leaf(x) -> let dqx = distance q x let c = if dqx < dqc then x else c let dqc = min dqx dqc (dqc, c) match vptree with Node(v, _, _) -> snd (nearest (distance q v) v vptree) | Leaf(x) -> x | Pair(x, y) -> let dqx = distance q x let dqy = distance q y if dqx < dqy then x else y module Example = open VPTree type Vector = V of int with static member Distance(V x, V y) = abs (x - y) let rng = new System.Random(int System.DateTime.Now.Ticks) let xs = Seq.distinct (List.init 1000 (fun i -> V <| rng.Next(i * i))) let vptree = VPTree.ofArray (Array.ofSeq xs) do printfn "%A" (List.map (fun i -> VPTree.nearest (V i) vptree) [ 1 ; 10 ; 100 ; 1000 ; 10000 ]) do printfn "%A" (List.init 10 (fun i -> VPTree.nearest (V (rng.Next(1000 * 1000))) vptree)) ```