databricks delta lives table intro

A delta live table is a declarative way to process data in a pipeline. You essentially using coding this in a notebook and generate a pipeline. 

Let's say you have the following codes

Customers

from pyspark.sql.functions import *
from pyspark.sql.types import *
import dlt

@dlt.view(
  comment="The customers buying finished products, ingested from /databricks-datasets."
)
def customers():
  return spark.read.csv('/databricks-datasets/retail-org/customers/customers.csv', header=True)

Sales_Orders_Raw

@dlt.table(
  comment="The raw sales orders, ingested from /databricks-datasets.",
  table_properties={
    "myCompanyPipeline.quality": "bronze",
    "pipelines.autoOptimize.managed": "true"
  }
)
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles") \
      .option("cloudFiles.schemaLocation", "/tmp/john.odwyer/pythonsalestest") \
      .option("cloudFiles.format", "json") \
      .option("cloudFiles.inferColumnTypes", "true") \
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

Sales_Order_Cleaned


@dlt.table(
  comment="The cleaned sales orders with valid order_number(s) and partitioned by order_date",
  partition_cols=["order_date"],
  table_properties={
    "myCompanyPipeline.quality": "silver",
    "pipelines.autoOptimize.managed": "true"
  }
)
@dlt.expect_or_drop("valid order_number", "order_number IS NOT NULL")
def sales_orders_cleaned():
  df = dlt.read_stream("sales_orders_raw").join(dlt.read("customers"), ["customer_id", "customer_name"], "left")
  df = df.withColumn("order_datetime", from_unixtime(df.order_datetime).cast("TIMESTAMP"))
  df = df.withColumn("order_date", df.order_datetime.cast("DATE"))
  df = df.select("customer_id", "customer_name", "number_of_line_items", "order_datetime", "order_date",
    "order_number", "ordered_products", "state", "city", "lon", "lat", "units_purchased", "loyalty_segment")
  return df

Sales_Order_In_LA

@dlt.table(
  comment="Aggregated sales orders in LA",
  table_properties={
    "myCompanyPipeline.quality": "gold",
    "pipelines.autoOptimize.managed": "true"
  }
)
def sales_order_in_la():
  df = dlt.read_stream("sales_orders_cleaned").where("city == 'Los Angeles'")
  df = df.select(df.city, df.order_date, df.customer_id, df.customer_name, explode(df.ordered_products).alias("ordered_products_explode"))

  dfAgg = df.groupBy(df.order_date, df.city, df.customer_id, df.customer_name, df.ordered_products_explode.curr.alias("currency"))\
    .agg(sum(df.ordered_products_explode.price).alias("sales"), sum(df.ordered_products_explode.qty).alias("quantity"))

  return dfAgg


Sales_Order_In_Chicago


@dlt.table(
  comment="Sales orders in Chicago",
  table_properties={
    "myCompanyPipeline.quality": "gold",
    "pipelines.autoOptimize.managed": "true"
  }
)
def sales_order_in_chicago():
  df = dlt.read_stream("sales_orders_cleaned").where("city == 'Chicago'")
  df = df.select(df.city, df.order_date, df.customer_id, df.customer_name, explode(df.ordered_products).alias("ordered_products_explode"))

  dfAgg = df.groupBy(df.order_date, df.city, df.customer_id, df.customer_name, df.ordered_products_explode.curr.alias("currency"))\
    .agg(sum(df.ordered_products_explode.price).alias("sales"), sum(df.ordered_products_explode.qty).alias("quantity"))

  return dfAgg


Note the name of the function is called customers. It also a stage called customers which read from csv file called customers.csv.

This is what the final ouput pipeline looks like. 

You will notice that the function name are tied to different stages in the pipeline. 







Comments

Popular posts from this blog

The specified initialization vector (IV) does not match the block size for this algorithm