Kedro allows integration with pyspark as described in kedro documentation. kedro-airflow-k8s allows running such projects with Airflow, using the external Spark cluster as a computation environment. Not every pipeline node is executed on spark cluster, but only the ones which require spark environment.
In order to make this happen, the following applies. Firstly plugin detects if any of the kedro nodes are pyspark related. All such nodes are logically grouped in a way that dependencies between all nodes in the pipeline are maintained. Such construction keeps data management between kedro nodes local within the cluster for performance matter, while enforcing order correctness. Secondly, plugin creates spark submit nodes inside the DAG to reflect spark related tasks, grouped in the previous step. Additionally, the cluster create and delete operator is setup so that the dedicated spark instance is ready for the sake of given job run. As the last step, the artifacts required by spark, namely cluster initialization shell script, project archive and kedro run python script are prepared.
spark configuration is a part of a
run_config. This plugin supports Google Dataproc, but it’s also possible
to provide custom operators via external factory.
To configure Dataproc with the project, set
cluster_config to provide dictionary that describes the cluster as required
by Airflow Dataproc operators.
Checking with Google Dataproc REST API is helpful.
It’s possible to execute kedro spark jobs on K8S. In this case, there’s no external cluster that’s started for the
purpose of the task. Instead, user has to provide image, containing both spark and kedro project. Usually such image
is created with
docker-image-tool.sh, which is a part of spark distribution. Example image building process may
look like this:
cp -r $SPARK_HOME /tmp
cp -r . /tmp/spark-3.1.2-bin-hadoop3.2
/tmp/spark-3.1.2-bin-hadoop3.2/bin/docker-image-tool.sh -t $COMMIT_SHA -p Dockerfile -r $REGISTRY_IMAGE -b java_image_tag=14-slim build
There’s a manual work to be done with the
Dockerfile first. One approach is to use one of the templates provided by
spark distribution and merge it with the
Dockerfile of kedro project. It’s important that image contain kedro project
with all of it’s contents and it’s installed as a package on a system level together with all the requirements. Any
supplementary jars required by the project can be included as well, unless should be fetched during the job execution
from the external location.
It’s also required to provide runner script inside the image. This script is provided as an entry point to
The script should accept
--env kedro application argument,
--node as a comma-separated list of kedro nodes
names to be executed and
It should initialize kedro session and run project with given arguments.
Dockerfileshould execute spark entrypoint on start. Script is provided as an argument to
Example script template is provided inside the plugin sources in
The script is delegating invocation directly to kedro.
In order to provide one’s own operators it’s sufficient to mark
run_config.spark.operator_factory with the name of the custom class that acts as the operator factory.
The class has to be available on the path when executing
kedro airflow-k8s commands.
The easiest way to start is to derive from
The following methods have to be provided:
create_cluster_operator - returns string with the create cluster operator
delete_cluster_operator - returns string with the delete cluster operator
submit_operator - returns string with the submit job operator
imports_statement - returns string with the full import statement of all required items from the previous methods
Custom initialization script
run_config.spark.user_init_path allows configuring the way the cluster is initialized. Plugin delivers
initialization script that’s aligned with the project artifacts. The script can be prepended with custom
logic, to support the cases like custom package repository setup.
allows appending to initialization script part of the shell script.
Scripts can use environment variable
PROJECT_HOME in order to refer to project location on the cluster.
It’s required the paths to be relative to the project
Detection of spark nodes
As the part of the plugin’s process is to detect spark based nodes, the following rules apply:
if the node is tagged with
kedro-airflow-k8s:group:pysparkit’s considered as a spark node - this allows arbitrary user selection of node to be executed by spark
if any of the node’s input or output is of type
pyspark.sql.dataframe.DataFrameit’s considered as a spark node - detection happens based on the type hints
if any of the node’s input or output is present in the data catalog as one of the
SparkJDBCDataSetit’s considered as a spark node
if none of the above applies, but logical group of spark nodes provide data as input to the node and the node provides the data as the input to the group it’s considered as a spark node
if none of the above applies, the node is considered as the
defaultand it’s put into DAG as usual