UsageΒΆ
Currently this uses the sparkcontext addPyFile method to ship this module to the workers.
To ship and install a conda environment the following steps are needed:
# Create a new conda env to test with, if you don't have one:
conda create -n sparkonda-test-env python=2.7 pip pandas scikit-learn numpy numba
source activate sparkonda-test-env
pip install sparkonda
###########################################################
###########################################################
# In the following example sc represents a running pyspark context
# Please see the tests folder for examples
###########################################################
# Adding the sparkonda_utils module to the spark workers:
###########################################################
###########################################################
####OPTION 1(requires knowledge of the installation location):
sc.addPyFile('path/to/sparkonda_utils.py')
####OPTION 2(tries to detect the installation folder):
# Declare these two helper functions to help with sys.modules cache manipulations
# and to add the sparkonda_utils.py file to the pyspark workers SparkFiles
def add_sparkonda_utils_to_workers(sc):
# Helper to add sparkonda_utils module to the workers
# and clean up the sys.modules cache afterward
import sparkonda
sparkonda.module_helper.add_module_to_workers(sc)
def import_sparkonda_utils():
# Helper to import the sparkonda_utils module
# Try-Catch trick used for IDEs, to provide autocomplete
try:
import sparkonda_utils as skon
except ImportError:
import sparkonda.sparkonda_utils as skon
return skon
add_sparkonda_utils_to_workers(sc)
skon = import_sparkonda_utils()
###########################################################
###########################################################
# Make sure that the skon module points to the SparkFile
# and not to the local sparkonda installation
skon.__file__
# Configure your skon object and use it to deploy your conda env to
# the spark workers
from os.path import expanduser
home_dir = expanduser("~")
# Edit to match your conda env name
skon.CONDA_ENV_NAME = 'sparkonda-test-env'
# Edit this path to match your conda env location
skon.CONDA_ENV_LOCATION = ''.join([home_dir,'/miniconda/envs/',skon.CONDA_ENV_NAME])
# Edit to match the number of spark workers
skon.SC_NUM_EXECUTORS = 2
# Edit to match the number of cores per spark worker
skon.SC_NUM_CORE_PER_EXECUTOR = 2
# Pack, ship, list worker dirs, install the conda env and set the workers python interpreter
skon.pack_conda_env()
skon.distribute_conda_env(sc)
skon.list_cwd_files(sc)
skon.install_conda_env(sc)
skon.set_workers_python_interpreter(sc)
# Test your setup
# This assumes that pandas and sklearn are installed in the conda env you specified
def check_pandas(x): import pandas as pd; return [pd.__version__]
skon.prun(sc, check_pandas, include_broadcast_vars=False)
def check_sklearn(x): import sklearn as sk; return [sk.__version__]
skon.prun(sc, check_sklearn, include_broadcast_vars=False)
To remove the custom conda env from the workers and reset the interpreter:
skon.remove_conda_env(sc)
skon.list_cwd_files(sc)
skon.reset_workers_python_interpreter(sc)
# Check that the package is not accessible anymore
# User should get an ImportErrror:
# ImportError: No module named sklearn
def check_sklearn(x): import sklearn as sk; return [sk.__version__]
skon.prun(sc, check_sklearn, include_broadcast_vars=False)