gcp running serverless dataproc with a hello world python script

Pyspark Pi calculation

To get thing started, we need a pyspark code - this is a simple pi.py andvthen upload this to your google bucket. 

import sys
from random import random
from operator import add

from pyspark.sql import SparkSession


if __name__ == "__main__":
    """
        Usage: pi [partitions]
    """
    spark = SparkSession\
        .builder\
        .appName("PythonPi")\
        .getOrCreate()

    partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
    n = 100000 * partitions

    def f(_: int) -> float:
        x = random() * 2 - 1
        y = random() * 2 - 1
        return 1 if x ** 2 + y ** 2 <= 1 else 0

    count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
    print("Pi is roughly %f" % (4.0 * count / n))

    spark.stop()

We can easily run pyspark code in data proc using the following command:-

gcloud dataproc batches submit pyspark gs://my-ai-staging-bucket/pi.py --region=asia-southeast1 --batch=my-batch-id-unique


And then in the console, we can see the outputs from this run.



Wordcount.py

Next we will take a look at wordcount.py. Upload this code to your gs bucket and a simple data set too. For example, i place this in my bucket here. 


And the wordcount.py looks like this. Looking at the code, spark.read.text - you will be thinking - this would be reading from a local directory, would a gs bucket work? Yes it will spark in GCP Dataproc uses Hadoop cloud storage connector  



import sys
from operator import add

from pyspark.sql import SparkSession


if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: wordcount <file>", file=sys.stderr)
        sys.exit(-1)

    spark = SparkSession\
        .builder\
        .appName("PythonWordCount")\
        .getOrCreate()

    lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
    counts = lines.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(add)
    output = counts.collect()
    for (word, count) in output:
        print("%s: %i" % (word, count))

    spark.stop()


And to run it :-


gcloud dataproc batches submit pyspark gs://my-ai-staging-bucket/wordcount.py \
--region=asia-southeast1 --batch=word-count-batch1 \
-- gs://my-ai-staging-bucket/cl-cover.txt

And you will get word count output results. 


So easy to get spark to work without hassle of setting up  your cluster






Comments

Popular posts from this blog

mongosh install properly

gemini cli getting file not defined error

NodeJS: Error: spawn EINVAL in window for node version 20.20 and 18.20