from __future__ import print_function
from numpy import asarray, unravel_index, prod, mod, ndarray, ceil, where, \
r_, sort, argsort, array, random, arange, ones, expand_dims
from itertools import groupby
from bolt.base import BoltArray
from bolt.spark.stack import StackedArray
from bolt.spark.utils import zip_with_index
from bolt.spark.statcounter import StatCounter
from bolt.utils import slicify, listify, tupleize, argpack, inshape, istransposeable, isreshapeable
[docs]class BoltArraySpark(BoltArray):
_metadata = BoltArray._metadata + ['_shape', '_split', '_dtype']
def __init__(self, rdd, shape=None, split=None, dtype=None):
self._rdd = rdd
self._shape = shape
self._split = split
self._dtype = dtype
self._mode = 'spark'
@property
def _constructor(self):
return BoltArraySpark
def __array__(self):
return self.toarray()
[docs] def cache(self):
"""
Cache the underlying RDD in memory.
"""
self._rdd.cache()
[docs] def unpersist(self):
"""
Remove the underlying RDD from memory.
"""
self._rdd.unpersist()
[docs] def stack(self, size=None):
"""
Aggregates records of a distributed array.
Stacking should improve the performance of vectorized operations,
but the resulting StackedArray object only exposes a restricted set
of operations (e.g. map, reduce). The unstack method can be used
to restore the full bolt array.
Parameters
----------
size : int, optional, default=None
The maximum size for each stack (number of original records),
will aggregate groups of records per partition up to this size,
if None will aggregate all records on each partition.
Returns
-------
StackedArray
"""
stk = StackedArray(self._rdd, shape=self.shape, split=self.split)
return stk.stack(size)
def _align(self, axis):
"""
Align spark bolt array so that axes for iteration are in the keys.
This operation is applied before most functional operators.
It ensures that the specified axes are valid, and swaps
key/value axes so that functional operators can be applied
over the correct records.
Parameters
----------
axis: tuple[int]
One or more axes that wil be iterated over by a functional operator
Returns
-------
BoltArraySpark
"""
# ensure that the specified axes are valid
inshape(self.shape, axis)
# find the value axes that should be moved into the keys (axis >= split)
tokeys = [(a - self.split) for a in axis if a >= self.split]
# find the key axes that should be moved into the values (axis < split)
tovalues = [a for a in range(self.split) if a not in axis]
if tokeys or tovalues:
return self.swap(tovalues, tokeys)
else:
return self
[docs] def first(self):
"""
Return the first element of an array
"""
from bolt.local.array import BoltArrayLocal
return BoltArrayLocal(self._rdd.values().first())
[docs] def map(self, func, axis=(0,), value_shape=None):
"""
Apply a function across an axis.
Array will be aligned so that the desired set of axes
are in the keys, which may incur a swap.
Parameters
----------
func : function
Function of a single array to apply
axis : tuple or int, optional, default=(0,)
Axis or multiple axes to apply function along.
value_shape : tuple, optional, default=None
Known shape of values resulting from operation
Returns
-------
BoltArraySpark
"""
axis = tupleize(axis)
swapped = self._align(axis)
if value_shape is None:
# try to compute the size of each mapped element by applying func to a random array
value_shape = None
try:
value_shape = func(random.randn(*swapped.values.shape).astype(self.dtype)).shape
except Exception:
first = swapped._rdd.first()
if first:
# eval func on the first element
mapped = func(first[1])
value_shape = mapped.shape
shape = tuple([swapped._shape[ax] for ax in range(len(axis))]) + tupleize(value_shape)
rdd = swapped._rdd.mapValues(func)
# reshaping will fail if the elements aren't uniformly shaped
def check(v):
if len(v.shape) > 0 and v.shape != tupleize(value_shape):
raise Exception("Map operation did not produce values of uniform shape.")
return v
rdd = rdd.mapValues(lambda v: check(v))
return self._constructor(rdd, shape=shape, split=swapped.split).__finalize__(swapped)
[docs] def filter(self, func, axis=(0,)):
"""
Filter array along an axis.
Applies a function which should evaluate to boolean,
along a single axis or multiple axes. Array will be
aligned so that the desired set of axes are in the keys,
which may incur a swap.
Parameters
----------
func : function
Function to apply, should return boolean
axis : tuple or int, optional, default=(0,)
Axis or multiple axes to filter along.
Returns
-------
BoltArraySpark
"""
axis = tupleize(axis)
if len(axis) != 1:
raise NotImplementedError("Filtering over multiple axes will not be "
"supported until SparseBoltArray is implemented.")
swapped = self._align(axis)
rdd = swapped._rdd.values().filter(func)
# count the resulting array in order to reindex (linearize) the keys
count, zipped = zip_with_index(rdd)
if not count:
count = zipped.count()
reindexed = zipped.map(lambda kv: (kv[1], kv[0]))
# since we can only filter over one axis, the remaining shape is always the following
remaining = list(swapped.shape[1:])
if count != 0:
shape = tuple([count] + remaining)
else:
shape = (0,)
return self._constructor(reindexed, shape=shape, split=swapped.split).__finalize__(swapped)
[docs] def reduce(self, func, axis=(0,), keepdims=False):
"""
Reduce an array along an axis.
Applies a commutative/associative function of two
arguments cumulatively to all arrays along an axis.
Array will be aligned so that the desired set of axes
are in the keys, which may incur a swap.
Parameters
----------
func : function
Function of two arrays that returns a single array
axis : tuple or int, optional, default=(0,)
Axis or multiple axes to reduce along.
Returns
-------
BoltArraySpark
"""
from bolt.local.array import BoltArrayLocal
from numpy import ndarray
axis = tupleize(axis)
swapped = self._align(axis)
arr = swapped._rdd.values().reduce(func)
if keepdims:
for i in axis:
arr = expand_dims(arr, axis=i)
if not isinstance(arr, ndarray):
# the result of a reduce can also be a scalar
return arr
elif arr.shape == (1,):
# ndarrays with single values in them should be converted into scalars
return arr[0]
return BoltArrayLocal(arr)
def _stat(self, axis=None, func=None, name=None, keepdims=False):
"""
Compute a statistic over an axis.
Can provide either a function (for use in a reduce)
or a name (for use by a stat counter).
Parameters
----------
axis : tuple or int, optional, default=None
Axis to compute statistic over, if None
will compute over all axes
func : function, optional, default=None
Function for reduce, see BoltArraySpark.reduce
name : str
A named statistic, see StatCounter
keepdims : boolean, optional, default=False
Keep axis remaining after operation with size 1.
"""
if axis is None:
axis = list(range(len(self.shape)))
axis = tupleize(axis)
if func and not name:
return self.reduce(func, axis, keepdims)
if name and not func:
from bolt.local.array import BoltArrayLocal
swapped = self._align(axis)
def reducer(left, right):
return left.combine(right)
counter = swapped._rdd.values()\
.mapPartitions(lambda i: [StatCounter(values=i, stats=name)])\
.reduce(reducer)
arr = getattr(counter, name)
if keepdims:
for i in axis:
arr = expand_dims(arr, axis=i)
return BoltArrayLocal(arr).toscalar()
else:
raise ValueError('Must specify either a function or a statistic name.')
[docs] def mean(self, axis=None, keepdims=False):
"""
Return the mean of the array over the given axis.
Parameters
----------
axis : tuple or int, optional, default=None
Axis to compute statistic over, if None
will compute over all axes
keepdims : boolean, optional, default=False
Keep axis remaining after operation with size 1.
"""
return self._stat(axis, name='mean', keepdims=keepdims)
[docs] def var(self, axis=None, keepdims=False):
"""
Return the variance of the array over the given axis.
Parameters
----------
axis : tuple or int, optional, default=None
Axis to compute statistic over, if None
will compute over all axes
keepdims : boolean, optional, default=False
Keep axis remaining after operation with size 1.
"""
return self._stat(axis, name='variance', keepdims=keepdims)
[docs] def std(self, axis=None, keepdims=False):
"""
Return the standard deviation of the array over the given axis.
Parameters
----------
axis : tuple or int, optional, default=None
Axis to compute statistic over, if None
will compute over all axes
keepdims : boolean, optional, default=False
Keep axis remaining after operation with size 1.
"""
return self._stat(axis, name='stdev', keepdims=keepdims)
[docs] def sum(self, axis=None, keepdims=False):
"""
Return the sum of the array over the given axis.
Parameters
----------
axis : tuple or int, optional, default=None
Axis to compute statistic over, if None
will compute over all axes
keepdims : boolean, optional, default=False
Keep axis remaining after operation with size 1.
"""
from operator import add
return self._stat(axis, func=add, keepdims=keepdims)
[docs] def max(self, axis=None, keepdims=False):
"""
Return the maximum of the array over the given axis.
Parameters
----------
axis : tuple or int, optional, default=None
Axis to compute statistic over, if None
will compute over all axes
keepdims : boolean, optional, default=False
Keep axis remaining after operation with size 1.
"""
from numpy import maximum
return self._stat(axis, func=maximum, keepdims=keepdims)
[docs] def min(self, axis=None, keepdims=False):
"""
Return the minimum of the array over the given axis.
Parameters
----------
axis : tuple or int, optional, default=None
Axis to compute statistic over, if None
will compute over all axes
keepdims : boolean, optional, default=False
Keep axis remaining after operation with size 1.
"""
from numpy import minimum
return self._stat(axis, func=minimum, keepdims=keepdims)
[docs] def concatenate(self, arry, axis=0):
"""
Join this array with another array.
Paramters
---------
arry : ndarray, BoltArrayLocal, or BoltArraySpark
Another array to concatenate with
axis : int, optional, default=0
The axis along which arrays will be joined.
Returns
-------
BoltArraySpark
"""
if isinstance(arry, ndarray):
from bolt.spark.construct import ConstructSpark
arry = ConstructSpark.array(arry, self._rdd.context, axis=range(0, self.split))
else:
if not isinstance(arry, BoltArraySpark):
raise ValueError("other must be local array or spark array, got %s" % type(arry))
if not all([x == y if not i == axis else True
for i, (x, y) in enumerate(zip(self.shape, arry.shape))]):
raise ValueError("all the input array dimensions except for "
"the concatenation axis must match exactly")
if not self.split == arry.split:
raise NotImplementedError("two arrays must have the same split ")
if axis < self.split:
shape = self.keys.shape
def key_func(key):
key = list(key)
key[axis] += shape[axis]
return tuple(key)
rdd = self._rdd.union(arry._rdd.map(lambda kv: (key_func(kv[0]), kv[1])))
else:
from numpy import concatenate as npconcatenate
shift = axis - self.split
rdd = self._rdd.join(arry._rdd).map(lambda kv: (kv[0], npconcatenate(kv[1], axis=shift)))
shape = tuple([x + y if i == axis else x
for i, (x, y) in enumerate(zip(self.shape, arry.shape))])
return self._constructor(rdd, shape=shape).__finalize__(self)
def _getbasic(self, index):
"""
Basic indexing (for slices or ints).
"""
index = tuple([slicify(s, d) for (s, d) in zip(index, self.shape)])
key_slices = index[0:self.split]
value_slices = index[self.split:]
def key_check(key):
check = lambda kk, ss: ss.start <= kk < ss.stop and mod(kk - ss.start, ss.step) == 0
out = [check(k, s) for k, s in zip(key, key_slices)]
return all(out)
def key_func(key):
return tuple([(k - s.start)/s.step for k, s in zip(key, key_slices)])
filtered = self._rdd.filter(lambda kv: key_check(kv[0]))
rdd = filtered.map(lambda kv: (key_func(kv[0]), kv[1][value_slices]))
shape = tuple([int(ceil((s.stop - s.start) / float(s.step))) for s in index])
split = self.split
return rdd, shape, split
def _getadvanced(self, index):
"""
Advanced indexing (for sets, lists, or ndarrays).
"""
index = [asarray(i) for i in index]
shape = index[0].shape
if not all([i.shape == shape for i in index]):
raise ValueError("shape mismatch: indexing arrays could not be broadcast "
"together with shapes " + ("%s " * self.ndim)
% tuple([i.shape for i in index]))
index = tuple([listify(i, d) for (i, d) in zip(index, self.shape)])
# build tuples with target indices
key_tuples = list(zip(*index[0:self.split]))
value_tuples = list(zip(*index[self.split:]))
# build dictionary to look up targets in values
d = {}
for k, g in groupby(zip(value_tuples, key_tuples), lambda x: x[1]):
d[k] = map(lambda x: x[0], list(g))
def key_check(key):
return key in key_tuples
def key_func(key):
return unravel_index(key, shape)
# filter records based on key targets
filtered = self._rdd.filter(lambda kv: key_check(kv[0]))
# subselect and flatten records based on value targets (if they exist)
if len(value_tuples) > 0:
flattened = filtered.flatMap(lambda kv: [(kv[0], kv[1][i]) for i in d[kv[0]]])
else:
flattened = filtered
# reindex
indexed = flattened.zipWithIndex()
rdd = indexed.map(lambda kkv: (key_func(kkv[1]), kkv[0][1]))
split = len(shape)
return rdd, shape, split
def __getitem__(self, index):
"""
Get an item from the array through indexing.
Supports basic indexing with slices and ints, or advanced
indexing with lists or ndarrays of integers.
Mixing basic and advanced indexing across axes is not
currently supported.
Parameters
----------
index : tuple of slices, ints, list, sets, or ndarrays
One or more index specifications
Returns
-------
BoltSparkArray
"""
index = tupleize(index)
if len(index) > self.ndim:
raise ValueError("Too many indices for array")
if not all([isinstance(i, (slice, int, list, set, ndarray)) for i in index]):
raise ValueError("Each index must either be a slice, int, list, set, or ndarray")
# fill unspecified axes with full slices
if len(index) < self.ndim:
index += tuple([slice(0, None, None) for _ in range(self.ndim - len(index))])
# convert ints to lists if not all ints and slices
if not all([isinstance(i, (int, slice)) for i in index]):
index = tuple([[i] if isinstance(i, int) else i for i in index])
# select basic or advanced indexing
if all([isinstance(i, (slice, int)) for i in index]):
rdd, shape, split = self._getbasic(index)
elif all([isinstance(i, (set, list, ndarray)) for i in index]):
rdd, shape, split = self._getadvanced(index)
else:
raise NotImplementedError("Cannot mix basic indexing (slices and ints) with "
"advanced indexing (lists and ndarrays) across axes")
result = self._constructor(rdd, shape=shape, split=split).__finalize__(self)
# squeeze out int dimensions (and squeeze to singletons if all ints)
if all([isinstance(i, int) for i in index]):
return result.squeeze().toarray()[()]
else:
tosqueeze = tuple([k for k, i in enumerate(index) if isinstance(i, int)])
return result.squeeze(tosqueeze)
[docs] def chunk(self, size="150", axis=None):
"""
Chunks records of a distributed array.
Chunking breaks arrays into subarrays, using an specified
size of chunks along each value dimension. Can alternatively
specify an average chunk byte size (in megabytes) and the size of
chunks (as ints) will be computed automatically.
Parameters
----------
size : tuple, int, or str, optional, default = "150"
A string giving the size in megabytes, or a tuple with the size
of chunks along each dimension.
axis : int or tuple, optional, default = None
One or more axis to chunk array along, if None
will use all axes,
Returns
-------
ChunkedArray
"""
if type(size) is not str:
size = tupleize((size))
axis = tupleize((axis))
from bolt.spark.chunk import ChunkedArray
chnk = ChunkedArray(rdd=self._rdd, shape=self._shape, split=self._split, dtype=self._dtype)
return chnk._chunk(size, axis)
[docs] def swap(self, kaxes, vaxes, size="150"):
"""
Swap axes from keys to values.
This is the core operation underlying shape manipulation
on the Spark bolt array. It exchanges an arbitrary set of axes
between the keys and the valeus. If either is None, will only
move axes in one direction (from keys to values, or values to keys).
Keys moved to values will be placed immediately after the split;
values moved to keys will be placed immediately before the split.
Parameters
----------
kaxes : tuple
Axes from keys to move to values
vaxes : tuple
Axes from values to move to keys
size : tuple or int, optional, default = "150"
Can either provide a string giving the size in megabytes,
or a tuple with the number of chunks along each
value dimension being moved
Returns
-------
BoltArraySpark
"""
kaxes = asarray(tupleize(kaxes), 'int')
vaxes = asarray(tupleize(vaxes), 'int')
if type(size) is not str:
size = tupleize(size)
if len(kaxes) == self.keys.ndim and len(vaxes) == 0:
raise ValueError('Cannot perform a swap that would '
'end up with all data on a single key')
if len(kaxes) == 0 and len(vaxes) == 0:
return self
from bolt.spark.chunk import ChunkedArray
c = ChunkedArray(self._rdd, shape=self._shape, split=self._split, dtype=self._dtype)
chunks = c._chunk(size, axis=vaxes)
swapped = chunks.keys_to_values(kaxes).values_to_keys([v+len(kaxes) for v in vaxes])
barray = swapped.unchunk()
return barray
[docs] def transpose(self, *axes):
"""
Return an array with the axes transposed.
This operation will incur a swap unless the
desiured permutation can be obtained
only by transpoing the keys or the values.
Parameters
----------
axes : None, tuple of ints, or n ints
If None, will reverse axis order.
"""
if len(axes) == 0:
p = arange(self.ndim-1, -1, -1)
else:
p = asarray(argpack(axes))
istransposeable(p, range(self.ndim))
split = self.split
# compute the keys/value axes that need to be swapped
new_keys, new_values = p[:split], p[split:]
swapping_keys = sort(new_values[new_values < split])
swapping_values = sort(new_keys[new_keys >= split])
stationary_keys = sort(new_keys[new_keys < split])
stationary_values = sort(new_values[new_values >= split])
# compute the permutation that the swap causes
p_swap = r_[stationary_keys, swapping_values, swapping_keys, stationary_values]
# compute the extra permutation (p_x) on top of this that
# needs to happen to get the full permutation desired
p_swap_inv = argsort(p_swap)
p_x = p_swap_inv[p]
p_keys, p_values = p_x[:split], p_x[split:]-split
# perform the swap and the the within key/value permutations
arr = self.swap(swapping_keys, swapping_values-split)
arr = arr.keys.transpose(tuple(p_keys.tolist()))
arr = arr.values.transpose(tuple(p_values.tolist()))
return arr
@property
[docs] def T(self):
"""
Transpose by reversing the order of the axes.
"""
return self.transpose()
[docs] def swapaxes(self, axis1, axis2):
"""
Return the array with two axes interchanged.
Parameters
----------
axis1 : int
The first axis to swap
axis2 : int
The second axis to swap
"""
p = list(range(self.ndim))
p[axis1] = axis2
p[axis2] = axis1
return self.transpose(p)
[docs] def reshape(self, *shape):
"""
Return an array with the same data but a new shape.
Currently only supports reshaping that independently
reshapes the keys, or the values, or both.
Parameters
----------
shape : tuple of ints, or n ints
New shape
"""
new = argpack(shape)
isreshapeable(new, self.shape)
if new == self.shape:
return self
i = self._reshapebasic(new)
if i == -1:
raise NotImplementedError("Currently no support for reshaping between "
"keys and values for BoltArraySpark")
else:
new_key_shape, new_value_shape = new[:i], new[i:]
return self.keys.reshape(new_key_shape).values.reshape(new_value_shape)
def _reshapebasic(self, shape):
"""
Check if the requested reshape can be broken into independant reshapes
on the keys and values. If it can, returns the index in the new shape
separating keys from values, otherwise returns -1
"""
new = tupleize(shape)
old_key_size = prod(self.keys.shape)
old_value_size = prod(self.values.shape)
for i in range(len(new)):
new_key_size = prod(new[:i])
new_value_size = prod(new[i:])
if new_key_size == old_key_size and new_value_size == old_value_size:
return i
return -1
[docs] def squeeze(self, axis=None):
"""
Remove one or more single-dimensional axes from the array.
Parameters
----------
axis : tuple or int
One or more singleton axes to remove.
"""
if not any([d == 1 for d in self.shape]):
return self
if axis is None:
drop = where(asarray(self.shape) == 1)[0]
elif isinstance(axis, int):
drop = asarray((axis,))
elif isinstance(axis, tuple):
drop = asarray(axis)
else:
raise ValueError("an integer or tuple is required for the axis")
if any([self.shape[i] > 1 for i in drop]):
raise ValueError("cannot select an axis to squeeze out which has size greater than one")
if any(asarray(drop) < self.split):
kmask = set([d for d in drop if d < self.split])
kfunc = lambda k: tuple([kk for ii, kk in enumerate(k) if ii not in kmask])
else:
kfunc = lambda k: k
if any(asarray(drop) >= self.split):
vmask = tuple([d - self.split for d in drop if d >= self.split])
vfunc = lambda v: v.squeeze(vmask)
else:
vfunc = lambda v: v
rdd = self._rdd.map(lambda kv: (kfunc(kv[0]), vfunc(kv[1])))
shape = tuple([ss for ii, ss in enumerate(self.shape) if ii not in drop])
split = len([d for d in range(self.keys.ndim) if d not in drop])
return self._constructor(rdd, shape=shape, split=split).__finalize__(self)
[docs] def astype(self, dtype, casting='unsafe'):
"""
Cast the array to a specified type.
Parameters
----------
dtype : str or dtype
Typecode or data-type to cast the array to (see numpy)
"""
rdd = self._rdd.mapValues(lambda v: v.astype(dtype, 'K', casting))
return self._constructor(rdd, dtype=dtype).__finalize__(self)
@property
[docs] def shape(self):
"""
Size of each dimension.
"""
return self._shape
@property
[docs] def size(self):
"""
Total number of elements.
"""
return prod(self._shape)
@property
[docs] def ndim(self):
"""
Number of dimensions.
"""
return len(self._shape)
@property
[docs] def split(self):
"""
Axis at which the array is split into keys/values.
"""
return self._split
@property
[docs] def dtype(self):
"""
Data-type of array.
"""
return self._dtype
@property
def mask(self):
return tuple([1] * len(self.keys.shape) + [0] * len(self.values.shape))
@property
[docs] def keys(self):
"""
Returns a restricted keys.
"""
from bolt.spark.shapes import Keys
return Keys(self)
@property
def values(self):
from bolt.spark.shapes import Values
return Values(self)
[docs] def tolocal(self):
"""
Returns a local bolt array by first collecting as an array.
"""
from bolt.local.array import BoltArrayLocal
return BoltArrayLocal(self.toarray())
[docs] def toarray(self):
"""
Returns the contents as a local array.
Will likely cause memory problems for large objects.
"""
x = self._rdd.sortByKey().values().collect()
return asarray(x).reshape(self.shape)
[docs] def tordd(self):
"""
Return the underlying RDD of the bolt array.
"""
return self._rdd
[docs] def display(self):
"""
Show a pretty-printed representation of this BoltArrayLocal.
"""
for x in self._rdd.take(10):
print(x)