Skip to main content
Version: Next

Python Transforms

In addition to SQL transforms, you may run Python transforms in your pipeline. Python transforms are defined by a Python function that takes a DataFrame as input and returns a DataFrame as output. The function is then registered as a transform in the pipeline.

Exactly like the SQL transform, you can define a python transform by creating a python file and adding it to the metadata/transform/<domain> directory.

You can also define a YAML configuration file for the python script using the exact same format as the SQL transform.

Arguments specified in the command line through the --options flag will be passed to the python function as keyword arguments:


$ starlake transform --name <domain>.<transform_name> --options key1=value1,key2=value2

will be passed as keyword arguments to the python function:


<transform-name>.py --key1 value1 --key2 value2

The dataframe returned by the python function will be saved as the output table of the transform.

note

Before returning the dataframe, the function must create a temporary view with the name SL_THIS so that the dataframe can be saved as a table.


import sys
from random import random
from operator import add

from pyspark.sql import SparkSession

if __name__ == "__main__":
"""
Usage: pi [partitions]
"""
spark = SparkSession \
.builder \
.getOrCreate()

partitions = 2
n = 100000 * partitions

def f(_: int) -> float:
x = random() * 2 - 1
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 <= 1 else 0

count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
result = "Pi is roughly %f" % (4.0 * count / n)
df = spark.createDataFrame([[result]])
df.createOrReplaceTempView("SL_THIS")