Run Spark jobs¶
Note
This feature is experimental and therefore should not to be used in a production environment.
This guide describes how to run Spark jobs from inside Kubeflow Notebooks and in Kubeflow Pipeline steps.
Requirements¶
An active CKF deployment with Apache Spark integration and access to the Kubeflow dashboard. See Integrate with Charmed Apache Spark for more details.
Run Spark job on Kubeflow Notebooks¶
This section describes how to run Spark jobs from inside a Kubeflow notebook environment.
Create a Kubeflow notebook. This notebook is the workspace from which you run commands. When creating the notebook, make sure to:
Use Charmed Spark Jupyterlab OCI image corresponding to the Spark version of your choice as the notebook image. For the purpose of this guide, we will use the image
ghcr.io/canonical/charmed-spark-jupyterlab:3.5-22.04_edge@sha256:72a6e89985e35e0920fb40c063b3287425760ebf823b129a87143d5ec0e99af7which corresponds to Apache Spark 3.5.Check the “Configure PySpark for Kubeflow notebooks” option under the “Configurations” section inside “Advanced Options” to apply necessary PodDefaults to the notebook pod.
Connect to the notebook and start a new Python 3 notebook session from the Launcher.
Once you are inside the notebook, you can access the Spark context using the in-built variable sc. You can also
import pyspark which comes pre-installed in the image, and then create Spark context by yourself.
To verify that Spark jobs can indeed be run from inside the notebook environment, run the following example code:
rdd = sc.parallelize([1, 2, 3, 4, 5], numSlices=2)
result = rdd.map(lambda x: x * x).collect()
print("Squared values:", result)
You should see an output similar to the following lines:
Squared values: [1, 4, 9, 16, 25]
Run Spark jobs in Kubeflow Pipeline¶
Kubeflow Pipelines provide
steps inside of which you can run Spark jobs
by adding the access-spark-pipeline: true label to a step during the pipeline’s definition.
Create a Kubeflow notebook using the default notebook image. This notebook is the workspace from which you run commands.
Once you are inside the notebook, you can now define Kubeflow
components that run Spark jobs using
pyspark. It is recommended to use the SparkSession context manager in the spark8t Python package that
comes along with the Charmed Spark Jupyterlab image to create Spark sessions – for the ease of initialisation of
Spark session with necessary configurations and to ensure that the sessions are correctly closed at the end of the
context manager.
Make sure the KFP SDK is installed in the Notebook’s environment to be able to define and configure Kubeflow Pipelines in Python.
!pip install kfp[kubernetes]
If you don’t have a notebook that runs Spark jobs already, use the following code as an example. It creates a Pipeline with a single component that runs a trivial Spark job.
Note
Please make sure to use the correct Charmed Apache Spark image for Apache Spark version of your choice.
In the code below (see CHARMED_SPARK_OCI_IMAGE), the image for Apache Spark 3.5 is used.
OCI image corresponding to other versions of Spark can be found
here.
from kfp import dsl, kubernetes
CHARMED_SPARK_OCI_IMAGE = "ghcr.io/canonical/charmed-spark:3.5.5-22.04_edge"
@dsl.component(
base_image=CHARMED_SPARK_OCI_IMAGE,
)
def spark_test_component() -> None:
import logging
import os
from spark8t.session import SparkSession
with SparkSession(
app_name="square_numbers`",
namespace=os.environ["SPARK_NAMESPACE"],
username=os.environ["SPARK_SERVICE_ACCOUNT"]
) as session:
rdd = session.sparkContext.parallelize(
["spark is fast", "spark is simple", "spark works"],
numSlices=3
)
word_counts = (
rdd
.flatMap(lambda line: line.split())
.map(lambda word: (word, 1))
.reduceByKey(lambda a, b: a + b)
.collect()
)
logging.info(f"Word counts: {word_counts}")
@dsl.pipeline(name="spark-test-pipeline")
def spark_pipeline():
task = spark_test_component()
kubernetes.pod_metadata.add_pod_label(
task,
label_key="access-spark-pipeline",
label_value="true",
)
Note that the label access-spark-pipeline: true has been added to the pipeline task pod, which is a necessary step
for us to refer to the SPARK_NAMESPACE and SPARK_SERVICE_ACCOUNT environment variables in the component definition
above.
Submit and run the Pipeline with the following code:
from kfp.client import Client
client = Client()
run = client.create_run_from_pipeline_func(
spark_pipeline,
experiment_name="Test Spark Job",
enable_caching=False,
)
Once the pipeline starts running, navigate to the output Run details. Wait for some time to allow
the run to complete, which usually takes a couple of minutes. Once the run is complete, you should see
the result similar to the following somewhere in its logs:
Word counts: [('is', 2), ('fast', 1), ('simple', 1), ('works', 1), ('spark', 3)]