from __future__ import print_function
from numpy import ndarray, asarray, ufunc, prod
from bolt.base import BoltArray
from bolt.utils import inshape, tupleize
from functools import reduce
[docs]class BoltArrayLocal(ndarray, BoltArray):
def __new__(cls, array):
obj = asarray(array).view(cls)
obj._mode = 'local'
return obj
def __array_finalize__(self, obj):
if obj is None:
return
self._mode = getattr(obj, 'mode', None)
def __array_wrap__(self, obj):
if obj.shape == ():
return obj[()]
else:
return ndarray.__array_wrap__(self, obj)
@property
def _constructor(self):
return BoltArrayLocal
def _align(self, axes, key_shape=None):
"""
Align local bolt array so that axes for iteration are in the keys.
This operation is applied before most functional operators.
It ensures that the specified axes are valid, and might transpose/reshape
the underlying array so that the functional operators can be applied
over the correct records.
Parameters
----------
axes: tuple[int]
One or more axes that will be iterated over by a functional operator
Returns
-------
BoltArrayLocal
"""
# ensure that the key axes are valid for an ndarray of this shape
inshape(self.shape, axes)
# compute the set of dimensions/axes that will be used to reshape
remaining = [dim for dim in range(len(self.shape)) if dim not in axes]
key_shape = key_shape if key_shape else [self.shape[axis] for axis in axes]
remaining_shape = [self.shape[axis] for axis in remaining]
linearized_shape = [prod(key_shape)] + remaining_shape
# compute the transpose permutation
transpose_order = axes + remaining
# transpose the array so that the keys being mapped over come first, then linearize keys
reshaped = self.transpose(*transpose_order).reshape(*linearized_shape)
return reshaped
[docs] def filter(self, func, axis=(0,)):
"""
Filter array along an axis.
Applies a function which should evaluate to boolean,
along a single axis or multiple axes. Array will be
aligned so that the desired set of axes are in the
keys, which may require a transpose/reshape.
Parameters
----------
func : function
Function to apply, should return boolean
axis : tuple or int, optional, default=(0,)
Axis or multiple axes to filter along.
Returns
-------
BoltArrayLocal
"""
axes = sorted(tupleize(axis))
reshaped = self._align(axes)
filtered = asarray(list(filter(func, reshaped)))
return self._constructor(filtered)
[docs] def map(self, func, axis=(0,)):
"""
Apply a function across an axis.
Array will be aligned so that the desired set of axes
are in the keys, which may require a transpose/reshape.
Parameters
----------
func : function
Function of a single array to apply
axis : tuple or int, optional, default=(0,)
Axis or multiple axes to apply function along.
Returns
-------
BoltArrayLocal
"""
axes = sorted(tupleize(axis))
key_shape = [self.shape[axis] for axis in axes]
reshaped = self._align(axes, key_shape=key_shape)
mapped = asarray(list(map(func, reshaped)))
elem_shape = mapped[0].shape
# invert the previous reshape operation, using the shape of the map result
linearized_shape_inv = key_shape + list(elem_shape)
reordered = mapped.reshape(*linearized_shape_inv)
return self._constructor(reordered)
[docs] def reduce(self, func, axis=0):
"""
Reduce an array along an axis.
Applies an associative/commutative function of two arguments
cumulatively to all arrays along an axis. Array will be aligned
so that the desired set of axes are in the keys, which may
require a transpose/reshape.
Parameters
----------
func : function
Function of two arrays that returns a single array
axis : tuple or int, optional, default=(0,)
Axis or multiple axes to reduce along.
Returns
-------
BoltArrayLocal
"""
axes = sorted(tupleize(axis))
# if the function is a ufunc, it can automatically handle reducing over multiple axes
if isinstance(func, ufunc):
inshape(self.shape, axes)
reduced = func.reduce(self, axis=tuple(axes))
else:
reshaped = self._align(axes)
reduced = reduce(func, reshaped)
new_array = self._constructor(reduced)
# ensure that the shape of the reduced array is valid
expected_shape = [self.shape[i] for i in range(len(self.shape)) if i not in axes]
if new_array.shape != tuple(expected_shape):
raise ValueError("reduce did not yield a BoltArray with valid dimensions")
return new_array
[docs] def first(self):
"""
Return first element of the array
"""
return self[0]
[docs] def concatenate(self, arry, axis=0):
"""
Join this array with another array.
Paramters
---------
arry : ndarray or BoltArrayLocal
Another array to concatenate with
axis : int, optional, default=0
The axis along which arrays will be joined.
Returns
-------
BoltArrayLocal
"""
if isinstance(arry, ndarray):
from bolt import concatenate
return concatenate((self, arry), axis)
else:
raise ValueError("other must be local array, got %s" % type(arry))
[docs] def toscalar(self):
"""
Returns the single scalar element contained in an array of shape (), if
the array has that shape. Returns self otherwise.
"""
if self.shape == ():
return self.toarray().reshape(1)[0]
else:
return self
[docs] def tospark(self, sc, axis=0):
"""
Converts a BoltArrayLocal into a BoltArraySpark
Parameters
----------
sc : SparkContext
The SparkContext which will be used to create the BoltArraySpark
axis : tuple or int, optional, default=0
The axis (or axes) across which this array will be parallelized
Returns
-------
BoltArraySpark
"""
from bolt import array
return array(self.toarray(), sc, axis=axis)
[docs] def tordd(self, sc, axis=0):
"""
Converts a BoltArrayLocal into an RDD
Parameters
----------
sc : SparkContext
The SparkContext which will be used to create the BoltArraySpark
axis : tuple or int, optional, default=0
The axis (or axes) across which this array will be parallelized
Returns
-------
RDD[(tuple, ndarray)]
"""
from bolt import array
return array(self.toarray(), sc, axis=axis).tordd()
[docs] def toarray(self):
"""
Returns the underlying ndarray wrapped by this BoltArrayLocal
"""
return asarray(self)
[docs] def display(self):
"""
Show a pretty-printed representation of this BoltArrayLocal
"""
print(str(self))
def __repr__(self):
return BoltArray.__repr__(self)