Problem
You are trying to use applyInPandasWithState
with Delta Live Tables but execution fails with a ModuleNotFoundError: No module named 'helpers'
error message.
Example error
Traceback (most recent call last):
File "/databricks/spark/python/pyspark/worker.py", line 1964, in main
func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/spark/python/pyspark/worker.py", line 1770, in read_udfs
arg_offsets, f = read_single_udf(
^^^^^^^^^^^^^^^^
File "/databricks/spark/python/pyspark/worker.py", line 802, in read_single_udf
f, return_type = read_command(pickleSer, infile)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/spark/python/pyspark/worker_util.py", line 70, in read_command
command = serializer._read_with_length(file)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/spark/python/pyspark/serializers.py", line 196, in _read_with_length
raise SerializationError("Caused by " + traceback.format_exc())
pyspark.serializers.SerializationError: Caused by Traceback (most recent call last):
File "/databricks/spark/python/pyspark/serializers.py", line 192, in _read_with_length
return self.loads(obj)
^^^^^^^^^^^^^^^
File "/databricks/spark/python/pyspark/serializers.py", line 572, in loads
return cloudpickle.loads(obj, encoding=encoding)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ModuleNotFoundError: No module named 'helpers'
Cause
ApplyInPandasWithState
does not work correctly when used with Delta Live Tables if you define the function you want to use outside of your notebook.
In this case, we are trying to use the count_fn
function that is defined in the helpers.streaming.functions
module. It is imported at the start of the example code block and then called as part of applyInPandasWithState
. This results in an error.
Example code
%python
import pandas as pd
from pyspark.sql.functions import col
from pyspark.sql.streaming.state import GroupStateTimeout
from helpers.streaming.functions import count_fn
from pyspark.sql.functions import udf
df = (
spark.readStream.format("rate")
.option("rowsPerSecond", "100")
.load()
.withColumn("id", col("value"))
.groupby("id")
.applyInPandasWithState(
func=count_fn,
outputStructType="id long, countAsString string",
stateStructType="len long",
outputMode="append",
timeoutConf=GroupStateTimeout.NoTimeout,
)
)
import dlt
import time
@dlt.table(name=f"random_{int(time.time())}")
def a():
return df
Solution
You should define the function you want to use within the notebook, reimporting the function you want to call as part of your custom function. Call the function you defined and it completes as expected.
This custom function imports count_fn
and runs it. By adding this to the sample code, and calling my_func
instead of calling count_fn
directly, the example code successfully completes.
def my_func(*args):
from helpers.streaming.functions import count_fn
return count_fn(*args)
Example code
%python
import pandas as pd
from pyspark.sql.functions import col
from pyspark.sql.streaming.state import GroupStateTimeout
from helpers.streaming.functions import count_fn
from pyspark.sql.functions import udf
def my_func(*args):
from helpers.streaming.functions import count_fn
return count_fn(*args)
df = (
spark.readStream.format("rate")
.option("rowsPerSecond", "100")
.load()
.withColumn("id", col("value"))
.groupby("id")
.applyInPandasWithState(
func=my_func,
outputStructType="id long, countAsString string",
stateStructType="len long",
outputMode="append",
timeoutConf=GroupStateTimeout.NoTimeout,
)
)
import dlt
import time
@dlt.table(name=f"random_{int(time.time())}")
def a():
return df