Home Deploy Spark-Kotlin project on GCP Dataproc cluster
Post
Cancel

Deploy Spark-Kotlin project on GCP Dataproc cluster

Introduction

In the previous post “How to create an example Spark-Kotlin-Gradle project?” I showed you how to create an example Apache Spark project written in Kotlin lang and using Gradle build system.

You can run this project locally, but Spark was designed to work with a lot of data and a big, distributed compute cluster so to unleash the full potential of this tool you may want to run it on a cloud. For example on Google Cloud Platform.

In this post, I’ll show you how to run an example Spark-Kotlin-Gradle project on GCP using gcloud command line tool.

Repository with an example project is available on my GitHub.

Adapt project to GCP

Versions

GCP delivers specific VM images which contain particular versions of Java, Scala, Spark, etc.

The newest image is 2.1.2-debian11 with the following:

  • Apache Spark 3.3.0
  • Java 11
  • Scala 2.12.14

You can see a list of available components version here.

Because of that, you have to set specific versions of Java and dependencies in build.gradle.kts:

  • JVM version 11 (jvmToolchain(11))
  • Apache Spark version 3.3.0 with Scala 2.12 (compileOnly("org.apache.spark:spark-sql_2.12:3.3.0"))
  • Kotlin Spark API (implementation("org.jetbrains.kotlinx.spark:kotlin-spark-api_3.3.0_2.12:1.2.3"))

Spark BigQuery Connector

For using GCP tools like BigQuery, Google Cloud Storage, etc., from Apache Spark you should use additional library Spark BigQuery Connector.

It has very well described README.md, so I encourage you to familiarize yourself with it.

You have to add a dependency with a matching version: implementation("com.google.cloud.spark:spark-bigquery_2.12:0.28.0")

Example code & data

In the example project, I want to show you how to read & write data using Google Cloud Storage bucket. I downloaded publicly available data of ESA air pollution measurements in Poland and put in into CSV, which you can copy into bucket and load in Spark.

Let’s look at the example code, initially only for data containers.

CSV is a flat file so that you can load data into some DTO classes:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
data class MeasurementDto(
  val school_name: String,
  val school_street: String?,
  val school_post_code: String,
  val school_city: String,
  val school_longitude: String,
  val school_latitude: String,
  val data_humidity_avg: String,
  val data_pressure_avg: String,
  val data_temperature_avg: String,
  val data_pm10_avg: String,
  val data_pm25_avg: String,
  val timestamp: String
) : Serializable

which you can map into more domain models like:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
data class Measurement(
  val school: School,
  val data: MeasurementData,
  val timestamp: Timestamp
) : Serializable

data class School(
  val name: String,
  val street: String?,
  val postCode: String,
  val city: String,
  val longitude: Double,
  val latitude: Double
) : Serializable

data class MeasurementData(
  val humidityAverage: Double,
  val pressureAverage: Double,
  val temperatureAverage: Double,
  val pm10Average: Double,
  val pm25Average: Double
) : Serializable

using function:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
data class MeasurementDto(
  ...
) : Serializable {
  fun toDomainObject(): Measurement = Measurement(
    school = School(
      name = school_name,
      street = school_street,
      postCode = school_post_code,
      city = school_city,
      longitude = school_longitude.toDouble(),
      latitude = school_latitude.toDouble()
    ),
    data = MeasurementData(
      humidityAverage = data_humidity_avg.toDouble(),
      pressureAverage = data_pressure_avg.toDouble(),
      temperatureAverage = data_temperature_avg.toDouble(),
      pm10Average = data_pm10_avg.toDouble(),
      pm25Average = data_pm25_avg.toDouble()
    ),
    timestamp = Timestamp.valueOf(timestamp)
  )
}

Now, you can look at the code executed by Spark:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
fun main() {
  withSpark(appName = "Measurements per city counter") {
    val inputPath = "gs://example-data/esa_air_pollution_measurements_poland.csv"
    val resultPath = "gs://example-data/measurements_per_city.csv"
    println("Starting job...")
    spark
      .read()
      .setDelimiter(";")
      .firstRowAsHeader()
      .csv(inputPath)
      .`as`<MeasurementDto>()
      .map { it.toDomainObject() }
      .groupByKey { it.school.city }
      .count()
      .repartition(1)
      .write()
      .csv(resultPath)
    println("Job done.")
  }
}

private fun DataFrameReader.setDelimiter(delimiter: String) = this.option("delimiter", delimiter)
private fun DataFrameReader.firstRowAsHeader() = this.option("header", "true")

The job is simple:

  • Reads data from Google Cloud Storage bucket (DTO class)
  • Maps to the domain model
  • Groups by city
  • Counts elements in group
  • Saves result in GCS

Deployment steps

  1. First of all, you must have a package ready to be deployed:
1
./gradlew clean && ./gradlew shadowJar
  1. Secondly, you must have a place to share this package on GCP and upload data. For that, you have to create your own Google Cloud Storage bucket (something like a folder in the cloud) and upload package & data there:
1
2
3
gsutil mb gs://example-data
gsutil cp build/libs/spark-kotlin-on-gcp-1.0-SNAPSHOT-all.jar gs://example-data/
gsutil cp data/esa_air_pollution_measurements_poland.csv gs://example-data/
  1. Thirdly, Apache Spark requires compute instances on which it can run. GCP supplies creating such clusters, and it’s called Dataproc:
1
gcloud dataproc clusters create my-cluster --region europe-west1 --image-version 2.1.2-debian11
  1. Now, you are ready to submit Apache Spark job to run on the created cluster:
1
gcloud dataproc jobs submit spark --cluster my-cluster --region europe-west1 --jars gs://example-data/spark-kotlin-on-gcp-1.0-SNAPSHOT-all.jar --class pl.jakubpradzynski.measurements.MeasurementsPerCityCounterJobKt
  1. When the job is done, you can clean up on GCP:
1
2
gcloud -q dataproc clusters delete my-cluster --region europe-west1
gsutil rm -r gs://example-data

All those steps are available in the single shell script, which you can check here.

Summary

If you want to read more about it, I refer you to some documentation:

You can use my example project from GitHub: Example Spark-Kotlin-Gradle project.

Above, I only describe a simple happy path of deployment. It’s worth exploring more about: different or more complex dataproc clusters, cost optimizations, more complex Spark jobs, or Spark optimization. In the future, I plan to cover some of those topics.

This post is licensed under CC BY 4.0 by the author.

How to create an example Spark-Kotlin-Gradle project?

Quick dump MongoDB to BigQuery using Dataflow

Comments powered by Disqus.