ThreadPoolExecutor is the recommended way to perform multithreading in Python. Multithreading is a way to speed up your application and is suitable for I/O bound tasks such as HTTP requests, database operations, etc.
Introduction
The simplest way to show how to use ThreadPoolExecutor is by example:
The application above will print the statement “I’m running in a thread: “ with the number 1 to 10, which will most probably be out of order.
Code Explanation
First, we created a ThreadPoolExecutor with 2 threads. Then, we submit 10 tasks to the ThreadPoolExecutor with a for-in loop, and in turn receive Future objects which we store in a list. We then loop through the list of futures using the as_completed() function, which allows us to receive futures that complete first instead of going sequentially through the futures list. Finally, we obtain the result of the task() function by calling future.result() and print the outcome.
Handling Exceptions
The example above was super simple, but what if we need to do something more complicated like firing concurrent HTTP requests. There is a possibility that the code may hit Exceptions during runtime due to uncontrollable reasons i.e. network issues, configuration issues. We can put a try-except block to handle Exceptions, but what if we want to cancel all submitted tasks when a future fails?
Shutdown
There is a built in function for ThreadPoolExecutor called shutdown().
In Python 3.7 and 3.8, shutdown() only stops the ThreadPoolExecutor from accepting new tasks. This means that if we submit all our tasks in one go at the beginning, and mid-way through a task fails which causes the future to return an Exception, other pending tasks that have not started running and are still in the executor’s job queue will NOT be cancelled.
Fortunately, it was revamped in Python 3.9 to allow users to cancel pending tasks in the executor’s job queue.
Python 3.7 and 3.8
At the time of writing this blog post I was using Python 3.7.10 and Python 3.8.5.
In Python 3.7 and 3.8, shutdown() only accepts one boolean parameter, wait. When wait = True, Python will wait until all submitted tasks have finished running before shutting down the ThreadPoolExecutor.
When wait = False, it will still behave in the same way. The only difference is that the call to executor.shutdown() will not block. The executor will continue running all pending tasks, but will not accept new tasks and will completely shut down after all tasks are ran. Meanwhile, the main thread continues with our script’s main execution flow.
In the code example above, a ValueError exception will be raised when num = 5. After hitting the ValueError exception, the script will call shutdown().
When wait = True, Python will wait for all futures after num = 5 to complete and print it out (almost) all at once.
Since wait = True, after handling the ValueError exception, the main thread will wait for all tasks in the job queue to complete, before returning to the for-in loop to receive completed futures from as_completed() and to print out the results. Notice that the results printed after the ValueError exception are printed at approximately the same time.
Meanwhile, when wait = False, Python will continue with the main thread’s execution flow.
Since wait = False, after handling the ValueError exception, the main thread immediately continues with the script’s execution flow, which is to wait and receive completed futures from as_completed() and to print out the results. Notice that the results printed after the ValueError exception are printed as each future completes.
At the time of writing this blog post, I was using Python 3.9.5.
In Python 3.9, an additional boolean parameter, cancel_futures, was added to shutdown(). This allows us to cancel pending futures in the queue that have not started running if any future fails mid-way through. However, there is a catch. If you use the as_completed() function, the script will be stuck indefinitely after calling executor.shutdown() because as_completed() is not triggered when a future is in the CANCELLED state.
As you can see here, as_completed() only returns a future if it is in the CANCELLED_AND_NOTIFIED or FINISHED state. Meanwhile, executor.shutdown() only cancels the future but does not change its state to CANCELLED_AND_NOTIFIED.
To confirm it, we can run this block of code below.
We are printing the states of all the futures every time we enter the for-in loop. The results of the block of code above is as shown below:
We can see that after the ValueError exception is handled, the last two futures changes state from PENDING to CANCELLED. The script will then be stuck because as_completed() never returns the remaining two futures.
What To Do
The way I fixed this is to scrap using the as_completed() function. Although it is the recommended way to loop through a list of futures, it’s useless if it causes the script to hang.
Now, we are going through the list of futures sequentially. We are also checking if the future is in CANCELLED state and if it is, we proceed to the next future. We have successfully avoided the never-ending as_completed() function when .shutdown(cancel_futures=True) is called.
The output of the code above will look like:
Bonus: I Want To Cancel Futures On Shutdown in Python 3.7 and 3.8
With a bit of knowledge of object-oriented programming, we can! Simply inherit the ThreadPoolExecutor class and add a new method (in case we still want the original shutdown()) with Python 3.9’s version of shutdown().