The way we can imagine Threading is like adding a new lanes to a highway.
If you have a single lane highway, then all cars have to follow each other and, if one car is going slow, then all the cars behind it have to go slow too. But, if you have a second or third lane, then the faster cars can pass the slower cars.
Let’s get into Threading with a toy example that prints names and ages:
import threading
import time
import random
def print_names():
for name in ("John", "Kate", "Mike", "Alex", "Ann"):
print(name)
time.sleep(random.uniform(0.5, 1.5))
def print_age():
for _ in range(5):
print(random.randint(20, 50))
time.sleep(random.uniform(0.5, 1.5))
print_names()
print_age()Output:
John
Kate
Mike
Alex
Ann
26
23
41
31
49
This is the expected behaviour. Now, let’s:
- create two
Threadobjects - then call them do to some stuff
- use
join()on both threads. The way we can think aboutjoin()is that we’re joining back up with the main thread: we split out the highway into multiple lanes and then we are bringing now all those multiple lines into a single lane
t1 = threading.Thread(target=print_names)
t2 = threading.Thread(target=print_age)
t1.start()
t2.start()
t1.join()
t2.join()Output:
John
47
Kate
26
Mike
24
21
Alex
24
Ann
As we can see, the execution is passing back and forth between the first thread and the second thread: basically when the thread 1 is sleeping, the execution gets passed over to thread 2 and viceversa.
Passing arguments into a function
If we want to pass some arguments to our function, we can use the args parameter in the Thread() class instantiation:
def print_names():
for name in ("John", "Kate", "Mike", "Alex", "Ann"):
print(name)
time.sleep(random.uniform(0.5, 1.5))
def print_age(min_sleep, max_sleep):
for _ in range(5):
print(random.randint(20, 50))
time.sleep(random.uniform(min_sleep, max_sleep))
t1 = threading.Thread(target=print_names)
t2 = threading.Thread(target=print_age, args=(0.2, 1))
t1.start()
t2.start()
t1.join()
t2.join()Output:
John
28
29
Kate
46
46
31
Mike
Alex
Ann
As we can see the ages are running significantly faster than the names.
More common example
Let’s take a look now into a more common example that you would actually want to use threading for. But, firstly, let’s talk about Threads in Python because it might be a bit different than in other languages.
In Python, Threads aren’t truly parallel. This is due to the Global Interpreter Lock (GIL). The GIL prevents multiple threads from executing Python by code at the same time. There are many reasons for this and there are pros and cons to the GIL, but the main reason is it makes much easier to write thread-safe code. The main thing to keep in mind when deciding whether to use threads or not is whether your task is CPU-bound or I/O-bound. CPU-bound tasks are tasked to require a lot of computation whereas for I/O bound tasks they do involve a good amount of waiting around (whether that is for user input, whether you’re downloading files, whether you’re reaching out to APIs, whether you’re writing files, etc…). For those CPU-bound tasks, you are likely going to want to use Multi-Processing (TODO: add link), but for I/O-bound tasks, Threading is the way to go.
Let’s give an example where we just want to download some files from a website:
import threading
import requests
from pathlib import Path
def download_file(url, filename):
print(f"Downloading {url} to {filename}")
response = requests.get(url)
Path(filename).write_bytes(response.content)
print(f"Finished downloaind {filename}")
base_url = "https://raw.githubusercontent.com/JacobCallahan/Understanding/master/Python/file_io"
urls = [
f"{base_url}/binary_file",
f"{base_url}/files.py",
f"{base_url}/names.txt",
f"{base_url}/new_file.txt",
]
threads = []
for url in urls:
print(url)
filename = "threading_lesson_download_folder/" + url.split("/")[-1]
print(filename)
t = threading.Thread(target=download_file, args=(url, filename))
t.start()
threads.append(t)
[t.join() for t in threads]Output:
https://raw.githubusercontent.com/JacobCallahan/Understanding/master/Python/file_io/binary_file
threading_lesson_download_folder/binary_file
Downloading https://raw.githubusercontent.com/JacobCallahan/Understanding/master/Python/file_io/binary_file to threading_lesson_download_folder/binary_file
https://raw.githubusercontent.com/JacobCallahan/Understanding/master/Python/file_io/files.py
threading_lesson_download_folder/files.py
Downloading https://raw.githubusercontent.com/JacobCallahan/Understanding/master/Python/file_io/files.py to threading_lesson_download_folder/files.py
https://raw.githubusercontent.com/JacobCallahan/Understanding/master/Python/file_io/names.txt
threading_lesson_download_folder/names.txt
Downloading https://raw.githubusercontent.com/JacobCallahan/Understanding/master/Python/file_io/names.txt to threading_lesson_download_folder/names.txt
https://raw.githubusercontent.com/JacobCallahan/Understanding/master/Python/file_io/new_file.txt
threading_lesson_download_folder/new_file.txt
Downloading https://raw.githubusercontent.com/JacobCallahan/Understanding/master/Python/file_io/new_file.txt to threading_lesson_download_folder/new_file.txt
Finished downloaind threading_lesson_download_folder/binary_file
Finished downloaind threading_lesson_download_folder/names.txt
Finished downloaind threading_lesson_download_folder/new_file.txt
Finished downloaind threading_lesson_download_folder/files.py
[None, None, None, None]
Since some files took longer than others, we did get to see that Python pass back execution between the different threads, so the faster one (binary_file) finish before the slower ones.
While you could technically stop there, this isn’t the way I like to handle threads in my projects. In fact, I like to use something that was introduces midway through Python 3 development and that is using something it came from cuncurrent.future. This module gives us a convenient way to deal with threads and it offers an interface that makes a few things a little bit more simple. The first thing to notice is that we’re actually given a chance to use a context manager because, as we said in the lesson about context manager (TODO: add link), it is useful to deal with the clean up so we don’t have to do that manually.
We’ll use the ThreadPoolExecutor class where one of the most important argument is max_workers: remember that in the previous example we had four different URLs and we created a thread for each of those. The question is: did we really need four threads to download those four files? Actually it depends on your priorities, but that if you had 1000 files? Do you really want to spin up 1000 threads? For most people the answer is “NO”, unless you have a way of manually balancing workloads between threads and ThreadPoolExecutor are fantastic interface to do that for you because you can specify the maximum amount of Threads (max_workers) you want to use. Note that:
- instead of saving threads in
threadslist, we’re going to save these asfutures. You can think of futures as Python’s way representing the result of a threads, kind of like a placeholder for the result. - to create futures we do like we did before: we’re going to iterate through our URLs and we’re going to create a future by submitting a task with
submitmethod to our executor. This is the equivalent of creating threads in theThreadPoolExecutor. - to start the threads, again we’re going to iterate over something, in this case
as_completed()function which we pass in our list of futures and it gives us an iterator that yields the futures as they complete. So, all we have to do is iterate overconcurrent.futures.as_completed(futures)and then we can get the result as they come in. - To get the results our we use a try-except structure.
Here’s the updated code:
import concurrent.futures
def get_status(url):
print(f"Getting status of {url}")
start_time = time.monotonic()
response = requests.get(url)
total_time = time.monotonic() - start_time
print(f"Finished getting status of {url} in {total_time:.2f} seconds")
return url, response.status_code
urls = [
"https://www.google.com",
"https://www.facebook.com",
"https://www.twitter.com",
"https://www.github.com",
"https://www.linkedin.com"
]
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
futures = []
for url in urls:
future = executor.submit(get_status, url)
futures.append(future)
for future in concurrent.futures.as_completed(futures):
try:
url, code = future.result()
print(f"Status code for {url} is {code}")
except Exception as err:
print(f"Task failed! {err}")This might be more verbose compared to the previous example, but the biggest advantage now is that we have this “auto-balance” for us with ThreadPoolExecutor just by specifying max_workers, so we can limit the number of active threads that are going on.
Let’s see the output:
Getting status of https://www.google.com
Getting status of https://www.facebook.com
Finished getting status of https://www.facebook.com in 0.21 seconds
Getting status of https://www.twitter.com
Status code for https://www.facebook.com is 200
Finished getting status of https://www.google.com in 0.31 seconds
Getting status of https://www.github.com
Status code for https://www.google.com is 200
Finished getting status of https://www.github.com in 0.29 seconds
Getting status of https://www.linkedin.com
Status code for https://www.github.com is 200
Finished getting status of https://www.twitter.com in 0.65 seconds
Status code for https://www.twitter.com is 200
Finished getting status of https://www.linkedin.com in 0.31 seconds
Status code for https://www.linkedin.com is 200- Line 1 and 2: it’s getting the status of Google and Facebook approximately at the same time;
- Line 3: it finished getting the status of Facebook;
- Line 4: immediately it moves on to getting the status of Twitter. “Immediately” because this is happening even before we print out the status code for Facebook (line 28 of the source code above). So, when we’re saying
url, code = future.result()we’re getting that result and immediately after it’s popping the next task into the queue and then it lets us continue our execution of the line 28 (TODO: actually I don’t understand well this last bullet point).
Let’s add a bit of complexity and also explore some more things to consider with threading. Now, we add a functionality where we write the status to a file:
import concurrent.futures
import requests
import time
from pathlib import Path
STATUS_REPORT = Path("status_report.txt")
STATUS_REPORT.write_text("")
def get_status(url):
current_text = STATUS_REPORT.read_text()
print(f"Getting status of {url}")
start_time = time.monotonic()
response = requests.get(url)
total_time = time.monotonic() - start_time
print(f"Finished getting status of {url} in {total_time:.2f} seconds")
STATUS_REPORT.write_text(f"{current_text}{url}: {response.status_code}\n")
return url, response.status_code
urls = [
"https://www.google.com",
"https://www.facebook.com",
"https://www.twitter.com",
"https://www.github.com",
"https://www.linkedin.com"
]
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
futures = []
for url in urls:
future = executor.submit(get_status, url)
futures.append(future)
for future in concurrent.futures.as_completed(futures):
try:
url, code = future.result()
print(f"Status code for {url} is {code}")
except Exception as err:
print(f"Task failed! {err}")Output:
Getting status of https://www.google.comGetting status of https://www.facebook.com
Finished getting status of https://www.facebook.com in 0.20 seconds
Status code for https://www.facebook.com is 200
Getting status of https://www.twitter.com
Finished getting status of https://www.google.com in 0.33 seconds
Status code for https://www.google.com is 200
Getting status of https://www.github.com
Finished getting status of https://www.github.com in 0.30 seconds
Status code for https://www.github.com is 200
Getting status of https://www.linkedin.com
Finished getting status of https://www.twitter.com in 0.66 seconds
Status code for https://www.twitter.com is 200
Finished getting status of https://www.linkedin.com in 0.38 seconds
Status code for https://www.linkedin.com is 200
If we look at the file that was created status_report.txt, we can see that is a complete mess: it starts with a blank line, then we have status code about only three services while the remaining ones are missing. What’s happening? This is something that we need to keep in mind when working with threads, especially threads that are interacting with the same resources (in this case STATUS_REPORT). All these are trying to interact with the same file and this it could get worse if you use max_workers=5 because again all these threads are fighting of the file overriding what each one is doing. This is not ideal, so we need a way to deal with this.
There is a pretty simple mechanism coming from threading library. We’ll create an instance of Lock() class and we can use this to determine when different threads can interact with different things. There are a couple of way you can deal with this lock:
- use
REPORT_LOCK.acquire()to check if anything else has already acquire the lock and, if not, it will acquire the lock (basically it’s saying “I’m going now, anyone else behind me is gonna have to wait” then it does all the execution and withREPORT_LOCK.release()it’s saying “I’m done, so I’m releasing it for the rest of you”).
Then:
import concurrent.futures
import requests
import time
from pathlib import Path
import threading
STATUS_REPORT = Path("status_report.txt")
STATUS_REPORT.write_text("")
REPORT_LOCK = threading.Lock()
def get_status(url):
REPORT_LOCK.acquire()
current_text = STATUS_REPORT.read_text()
print(f"Getting status of {url}")
start_time = time.monotonic()
response = requests.get(url)
total_time = time.monotonic() - start_time
print(f"Finished getting status of {url} in {total_time:.2f} seconds")
STATUS_REPORT.write_text(f"{current_text}{url}: {response.status_code}\n")
REPORT_LOCK.release()
return url, response.status_code
urls = [
"https://www.google.com",
"https://www.facebook.com",
"https://www.twitter.com",
"https://www.github.com",
"https://www.linkedin.com"
]
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = []
for url in urls:
future = executor.submit(get_status, url)
futures.append(future)
for future in concurrent.futures.as_completed(futures):
try:
url, code = future.result()
print(f"Status code for {url} is {code}")
except Exception as err:
print(f"Task failed! {err}")Output:
Getting status of https://www.google.com
Finished getting status of https://www.google.com in 0.35 seconds
Status code for https://www.google.com is 200
Getting status of https://www.facebook.com
Finished getting status of https://www.facebook.com in 0.19 seconds
Status code for https://www.facebook.com is 200
Getting status of https://www.twitter.com
Finished getting status of https://www.twitter.com in 0.70 seconds
Getting status of https://www.github.com
Status code for https://www.twitter.com is 200
Finished getting status of https://www.github.com in 0.30 seconds
Getting status of https://www.linkedin.com
Status code for https://www.github.com is 200
Finished getting status of https://www.linkedin.com in 0.42 seconds
Status code for https://www.linkedin.com is 200
If you look at the output, we can confirm that it works well. Even though this code is valid, it’s better to use a context manager:
import concurrent.futures
import requests
import time
from pathlib import Path
import threading
STATUS_REPORT = Path("status_report.txt")
STATUS_REPORT.write_text("")
REPORT_LOCK = threading.Lock()
def get_status(url):
with REPORT_LOCK:
current_text = STATUS_REPORT.read_text()
print(f"Getting status of {url}")
start_time = time.monotonic()
response = requests.get(url)
total_time = time.monotonic() - start_time
print(f"Finished getting status of {url} in {total_time:.2f} seconds")
STATUS_REPORT.write_text(f"{current_text}{url}: {response.status_code}\n")
return url, response.status_code
urls = [
"https://www.google.com",
"https://www.facebook.com",
"https://www.twitter.com",
"https://www.github.com",
"https://www.linkedin.com"
]
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = []
for url in urls:
future = executor.submit(get_status, url)
futures.append(future)
for future in concurrent.futures.as_completed(futures):
try:
url, code = future.result()
print(f"Status code for {url} is {code}")
except Exception as err:
print(f"Task failed! {err}")Output:
Getting status of https://www.google.com
Finished getting status of https://www.google.com in 0.34 seconds
Status code for https://www.google.com is 200
Getting status of https://www.facebook.com
Finished getting status of https://www.facebook.com in 0.25 seconds
Status code for https://www.facebook.com is 200
Getting status of https://www.twitter.com
Finished getting status of https://www.twitter.com in 0.71 seconds
Status code for https://www.twitter.com is 200
Getting status of https://www.github.com
Finished getting status of https://www.github.com in 0.27 seconds
Getting status of https://www.linkedin.com
Status code for https://www.github.com is 200
Finished getting status of https://www.linkedin.com in 0.40 seconds
Status code for https://www.linkedin.com is 200
If you look at the output of the print statements, we can see that this has effectively made it sequential (first Google, second Facebook, and so on.). That’s because we are blocking thing for other threads at the very top our function. If you wanted to design this better, we could collapse the lock to only the context that it needs to be in. So, everything else that doens’t need that lock context can happen outside of it:
import concurrent.futures
import requests
import time
from pathlib import Path
import threading
STATUS_REPORT = Path("status_report.txt")
STATUS_REPORT.write_text("")
REPORT_LOCK = threading.Lock()
def get_status(url):
print(f"Getting status of {url}")
start_time = time.monotonic()
response = requests.get(url)
total_time = time.monotonic() - start_time
print(f"Finished getting status of {url} in {total_time:.2f} seconds")
with REPORT_LOCK:
current_text = STATUS_REPORT.read_text()
STATUS_REPORT.write_text(f"{current_text}{url}: {response.status_code}\n")
return url, response.status_code
urls = [
"https://www.google.com",
"https://www.facebook.com",
"https://www.twitter.com",
"https://www.github.com",
"https://www.linkedin.com"
]
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = []
for url in urls:
future = executor.submit(get_status, url)
futures.append(future)
for future in concurrent.futures.as_completed(futures):
try:
url, code = future.result()
print(f"Status code for {url} is {code}")
except Exception as err:
print(f"Task failed! {err}")Output:
Getting status of https://www.google.com
Getting status of https://www.facebook.com
Getting status of https://www.twitter.com
Getting status of https://www.github.com
Getting status of https://www.linkedin.com
Finished getting status of https://www.facebook.com in 0.23 seconds
Status code for https://www.facebook.com is 200
Finished getting status of https://www.github.com in 0.37 seconds
Status code for https://www.github.com is 200
Finished getting status of https://www.google.com in 0.41 seconds
Status code for https://www.google.com is 200
Finished getting status of https://www.linkedin.com in 0.45 seconds
Status code for https://www.linkedin.com is 200
Finished getting status of https://www.twitter.com in 0.72 seconds
Status code for https://www.twitter.com is 200
TODO: TO FINISH this lesson!