gcp running serverless dataproc with a hello world python script
Pyspark Pi calculation
To get thing started, we need a pyspark code - this is a simple pi.py andvthen upload this to your google bucket.
import sys
from random import random
from operator import add
from pyspark.sql import SparkSession
if __name__ == "__main__":
"""
Usage: pi [partitions]
"""
spark = SparkSession\
.builder\
.appName("PythonPi")\
.getOrCreate()
partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
n = 100000 * partitions
def f(_: int) -> float:
x = random() * 2 - 1
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 <= 1 else 0
count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n))
spark.stop()
We can easily run pyspark code in data proc using the following command:-
gcloud dataproc batches submit pyspark gs://my-ai-staging-bucket/pi.py --region=asia-southeast1 --batch=my-batch-id-unique
And then in the console, we can see the outputs from this run.
Wordcount.py
Next we will take a look at wordcount.py. Upload this code to your gs bucket and a simple data set too. For example, i place this in my bucket here.
And the wordcount.py looks like this. Looking at the code, spark.read.text - you will be thinking - this would be reading from a local directory, would a gs bucket work? Yes it will spark in GCP Dataproc uses Hadoop cloud storage connector
import sys
from operator import add
from pyspark.sql import SparkSession
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: wordcount <file>", file=sys.stderr)
sys.exit(-1)
spark = SparkSession\
.builder\
.appName("PythonWordCount")\
.getOrCreate()
lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(add)
output = counts.collect()
for (word, count) in output:
print("%s: %i" % (word, count))
spark.stop()
And to run it :-
gcloud dataproc batches submit pyspark gs://my-ai-staging-bucket/wordcount.py \
--region=asia-southeast1 --batch=word-count-batch1 \
-- gs://my-ai-staging-bucket/cl-cover.txt
And you will get word count output results.
So easy to get spark to work without hassle of setting up your cluster
Comments