databricks delta lives table intro
A delta live table is a declarative way to process data in a pipeline. You essentially using coding this in a notebook and generate a pipeline.
Let's say you have the following codes
Customers
from pyspark.sql.functions import *
from pyspark.sql.types import *
import dlt
@dlt.view(
comment="The customers buying finished products, ingested from /databricks-datasets."
)
def customers():
return spark.read.csv('/databricks-datasets/retail-org/customers/customers.csv', header=True)
Sales_Orders_Raw
@dlt.table(
comment="The raw sales orders, ingested from /databricks-datasets.",
table_properties={
"myCompanyPipeline.quality": "bronze",
"pipelines.autoOptimize.managed": "true"
}
)
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles") \
.option("cloudFiles.schemaLocation", "/tmp/john.odwyer/pythonsalestest") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.inferColumnTypes", "true") \
.load("/databricks-datasets/retail-org/sales_orders/")
)
Sales_Order_Cleaned
@dlt.table(
comment="The cleaned sales orders with valid order_number(s) and partitioned by order_date",
partition_cols=["order_date"],
table_properties={
"myCompanyPipeline.quality": "silver",
"pipelines.autoOptimize.managed": "true"
}
)
@dlt.expect_or_drop("valid order_number", "order_number IS NOT NULL")
def sales_orders_cleaned():
df = dlt.read_stream("sales_orders_raw").join(dlt.read("customers"), ["customer_id", "customer_name"], "left")
df = df.withColumn("order_datetime", from_unixtime(df.order_datetime).cast("TIMESTAMP"))
df = df.withColumn("order_date", df.order_datetime.cast("DATE"))
df = df.select("customer_id", "customer_name", "number_of_line_items", "order_datetime", "order_date",
"order_number", "ordered_products", "state", "city", "lon", "lat", "units_purchased", "loyalty_segment")
return df
Sales_Order_In_LA
@dlt.table(
comment="Aggregated sales orders in LA",
table_properties={
"myCompanyPipeline.quality": "gold",
"pipelines.autoOptimize.managed": "true"
}
)
def sales_order_in_la():
df = dlt.read_stream("sales_orders_cleaned").where("city == 'Los Angeles'")
df = df.select(df.city, df.order_date, df.customer_id, df.customer_name, explode(df.ordered_products).alias("ordered_products_explode"))
dfAgg = df.groupBy(df.order_date, df.city, df.customer_id, df.customer_name, df.ordered_products_explode.curr.alias("currency"))\
.agg(sum(df.ordered_products_explode.price).alias("sales"), sum(df.ordered_products_explode.qty).alias("quantity"))
return dfAgg
Sales_Order_In_Chicago
@dlt.table(
comment="Sales orders in Chicago",
table_properties={
"myCompanyPipeline.quality": "gold",
"pipelines.autoOptimize.managed": "true"
}
)
def sales_order_in_chicago():
df = dlt.read_stream("sales_orders_cleaned").where("city == 'Chicago'")
df = df.select(df.city, df.order_date, df.customer_id, df.customer_name, explode(df.ordered_products).alias("ordered_products_explode"))
dfAgg = df.groupBy(df.order_date, df.city, df.customer_id, df.customer_name, df.ordered_products_explode.curr.alias("currency"))\
.agg(sum(df.ordered_products_explode.price).alias("sales"), sum(df.ordered_products_explode.qty).alias("quantity"))
return dfAgg
Note the name of the function is called customers. It also a stage called customers which read from csv file called customers.csv.
This is what the final ouput pipeline looks like.
You will notice that the function name are tied to different stages in the pipeline.
Comments