AWS Lambda multiprocessing.ThreadPool function issue when using db-connect 14 and 15 in Databricks

Use db-connect 16 in Databricks.

Written by lucas.rocha

Last published at: January 16th, 2025

Problem

You are attempting to use Databricks Connect with Databricks Runtime 14.1 - 15.4 LTS within an AWS Lambda function when you get a Function not implemented error message.

 

Example error

[ERROR] OSError: [Errno 38] Function not implemented
Traceback (most recent call last):
  File "/var/task/src/check_status.py", line 129, in lambda_handler
    holdout_in_research = get_holdout_in_research(
  File "/var/lang/lib/python3.11/site-packages/tenacity/__init__.py", line 289, in wrapped_f
    return self(f, *args, **kw)
  File "/var/lang/lib/python3.11/site-packages/tenacity/__init__.py", line 379, in __call__
    do = self.iter(retry_state=retry_state)
  File "/var/lang/lib/python3.11/site-packages/tenacity/__init__.py", line 314, in iter
    return fut.result()
  File "/var/lang/lib/python3.11/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
[...]

 

Cause

Databricks Connect started using the Python multiprocesing.ThreadPool module in version 14. The multiprocesing.ThreadPool module requires shared memory support, which is not available in the AWS Lambda execution environment.

For more information, review the Parallel Processing in Python with AWS Lambda documentation.

 

Solution

You can avoid this issue by upgrading to Databricks Connect for Databricks Runtime 16.0 or above.

To work around this issue with Databricks Connect version 14.x and 15.x, you can apply a workaround by mocking the methods that use multiprocesing.ThreadPool. This can be achieved by creating a custom patch to intercept the module import and apply the necessary changes.

 

Step 1

Create a new Python file (e.g., `pyspark_patch.py`) in the same path as your AWS Lambda function and add the example code.

 

Example code

The example code implements a monkey patch for the ExecutePlanResponseReattachableIterator class from PySpark's SQL Connect client. The modifications address the handling of thread pooling and the release of execution resources during iterative processing.

import os
import warnings
from threading import RLock
from concurrent.futures import ThreadPoolExecutor
from pyspark.sql.connect.client.reattach import ExecutePlanResponseReattachableIterator

# Override class variables
ExecutePlanResponseReattachableIterator._lock = RLock()
ExecutePlanResponseReattachableIterator._release_thread_pool_instance = None

# Define the new _release_thread_pool class method
@classmethod
@property
def _release_thread_pool(cls):
  if cls._release_thread_pool_instance is not None:
    return cls._release_thread_pool_instance
  with cls._lock:
    if cls._release_thread_pool_instance is None:
      max_workers = os.cpu_count() or 8
      cls._release_thread_pool_instance = ThreadPoolExecutor(max_workers=max_workers)
    return cls._release_thread_pool_instance
​
# Monkey patch the _release_thread_pool method
ExecutePlanResponseReattachableIterator._release_thread_pool = _release_thread_pool

# Redefine and monkey patch the _release_until method
def _release_until(self, until_response_id: str) -> None:
  if self._result_complete:
    return

  request = self._create_release_execute_request(until_response_id)
​
  def target() -> None:
    try:
      for attempt in self._retrying():
        with attempt:
          self._stub.ReleaseExecute(request, metadata=self._metadata)
    except Exception as e:
      warnings.warn(f"ReleaseExecute failed with exception: {e}.")
​
  with self._lock:
    if self._release_thread_pool_instance is not None:
      self._release_thread_pool.submit(target)
​
ExecutePlanResponseReattachableIterator._release_until = _release_until
​
# Redefine and monkey patch the _release_all method
def _release_all(self) -> None:
  if self._result_complete:
    return
​
  request = self._create_release_execute_request(None)

  def target() -> None:
    try:
      for attempt in self._retrying():
        with attempt:
          self._stub.ReleaseExecute(request, metadata=self._metadata)
    except Exception as e:
      warnings.warn(f"ReleaseExecute failed with exception: {e}.")

  with self._lock:
    if self._release_thread_pool_instance is not None:
      self._release_thread_pool.submit(target)
  self._result_complete = True

ExecutePlanResponseReattachableIterator._release_all = _release_all

 

Step 2

Import the pyspark_patch.py file you just created as the top line of your AWS Lambda code.

 

Example code

This is an example of a Lambda function using the patch.

import pyspark_patch
from databricks.connect import DatabricksSession

HOST = "..."
TOKEN = "..."
CLUSTER_ID = "..."

def lambda_handler(event, context):
​
  spark = DatabricksSession.builder.remote(
      host=HOST,
      token=TOKEN,
      cluster_id=CLUSTER_ID,
      ).getOrCreate()

  df = spark.table(...)

 

By applying this workaround, you can bypass the multiprocessing.ThreadPool module and use Databricks Connect version 14.x and 15.x in AWS Lambda.