gcp running serverless dataproc with a hello world python script

Pyspark Pi calculation

To get thing started, we need a pyspark code - this is a simple pi.py andvthen upload this to your google bucket. 

import sys
from random import random
from operator import add

from pyspark.sql import SparkSession


if __name__ == "__main__":
    """
        Usage: pi [partitions]
    """
    spark = SparkSession\
        .builder\
        .appName("PythonPi")\
        .getOrCreate()

    partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
    n = 100000 * partitions

    def f(_: int) -> float:
        x = random() * 2 - 1
        y = random() * 2 - 1
        return 1 if x ** 2 + y ** 2 <= 1 else 0

    count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
    print("Pi is roughly %f" % (4.0 * count / n))

    spark.stop()

We can easily run pyspark code in data proc using the following command:-

gcloud dataproc batches submit pyspark gs://my-ai-staging-bucket/pi.py --region=asia-southeast1 --batch=my-batch-id-unique


And then in the console, we can see the outputs from this run.



Wordcount.py

Next we will take a look at wordcount.py. Upload this code to your gs bucket and a simple data set too. For example, i place this in my bucket here. 


And the wordcount.py looks like this. Looking at the code, spark.read.text - you will be thinking - this would be reading from a local directory, would a gs bucket work? Yes it will spark in GCP Dataproc uses Hadoop cloud storage connector  



import sys
from operator import add

from pyspark.sql import SparkSession


if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: wordcount <file>", file=sys.stderr)
        sys.exit(-1)

    spark = SparkSession\
        .builder\
        .appName("PythonWordCount")\
        .getOrCreate()

    lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
    counts = lines.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(add)
    output = counts.collect()
    for (word, count) in output:
        print("%s: %i" % (word, count))

    spark.stop()


And to run it :-


gcloud dataproc batches submit pyspark gs://my-ai-staging-bucket/wordcount.py \
--region=asia-southeast1 --batch=word-count-batch1 \
-- gs://my-ai-staging-bucket/cl-cover.txt

And you will get word count output results. 


So easy to get spark to work without hassle of setting up  your cluster






Comments

Popular posts from this blog

vllm : Failed to infer device type

NodeJS: Error: spawn EINVAL in window for node version 20.20 and 18.20

android studio kotlin source is null error