Ceci est une ancienne révision du document !
Table des matières
Notes Python multithreading
Voir :
- asyncio / aiohttp / multiprocess / trio / grequests
Équivalent à defer (go lang) :
Nombre de CPU logiques
import multiprocessing multiprocessing.cpu_count()
Exemples
from threading import Thread import requests THREAD_COUNT = 6 def callback(): try: while True: r = requests.get('http://www.emrecetin.net') print(r) except KeyboardInterrupt: return if __name__ == '__main__': threads = [] for i in range(THREAD_COUNT): t = Thread(target=callback) threads.append(t) t.start() for t in threads: t.join()
Source : https://gist.github.com/emrectn/aea6d955b37bd15687d0112d236f8a3b
import requests import threading def make_request(url): response = requests.get(url) print(f"Response from {url}: {response.status_code}") # List of URLs to make requests to urls = [ "https://www.example.com", "https://www.google.com", "https://www.wikipedia.org", "https://www.python.org" ] # Create and start threads for each URL threads = [] for url in urls: thread = threading.Thread(target=make_request, args=(url,)) thread.start() threads.append(thread) # Wait for all threads to finish for thread in threads: thread.join()
Source : https://www.w3resource.com/python-exercises/threading/python-multi-threading-exercise-7.php
import grequests class Test: def __init__(self): self.urls = [ 'http://www.example.com', 'http://www.google.com', 'http://www.yahoo.com', 'http://www.stackoverflow.com/', 'http://www.reddit.com/' ] def exception(self, request, exception): print "Problem: {}: {}".format(request.url, exception) def async(self): results = grequests.map((grequests.get(u) for u in self.urls), exception_handler=self.exception, size=5) print results test = Test() test.async()
Source : https://stackoverflow.com/questions/38280094/python-requests-with-multithreading
Alternatives Multi threads à map
Exemple 1
Voir :
import concurrent.futures with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: executor.map(get_dataset, URLS)
ou encore
from multiprocessing import Pool p = Pool(12) p.map(process_archive, zip_files)
Voir : https://softhints.com/parallel-processing-zip-archive-csv-files-python-and-pandas/
Exemple 2
from multiprocessing import Pool pool = Pool() result1 = pool.apply_async(solve1, [A]) # evaluate "solve1(A)" asynchronously result2 = pool.apply_async(solve2, [B]) # evaluate "solve2(B)" asynchronously answer1 = result1.get(timeout=10) answer2 = result2.get(timeout=10)
args = [A, B] results = pool.map(solve1, args)
Source : https://stackoverflow.com/questions/20548628/how-to-do-parallel-programming-in-python
Exemple 3
You can apply the function to every element in a list using the map() function:
list(map(square, [1, 2, 3, 4, 5, 6]))
The multiprocessing.pool.Pool class provides an equivalent but parallelized (via multiprocessing) way of doing this. The pool class, by default, creates one new process per CPU and does parallel calculations on the list:
from multiprocessing import Pool with Pool() as pool: pool.map(square, [1, 2, 3, 4, 5, 6])
Source : https://aaltoscicomp.github.io/python-for-scicomp/parallel/
Notes async asyncio await
Voir :
Async Queues :
A savoir :
- Vous pouvez uniquement utiliser
awaitdans les fonctions créées avecasync def - Une Coroutune déclarée avec
async defne peut être appelée que parawaitsi le résultat doit être attenducreate_task()(asyncio.create_task()ouasyncio.TaskGroup) pour ne pas attendre le résultatasyncio.gather()asyncio.run()
- “Coroutine” est juste un terme élaboré pour désigner ce qui est retourné par une fonction définie avec
async def. Python sait que c'est comme une fonction classique qui va démarrer à un moment et terminer à un autre, mais qu'elle peut aussi être mise en pause, du moment qu'il y a unawaitdans son contenu. time.sleep()(ainsi que d'autres lib/fonction) n'est pas compatible avecasyncio. A la place il faut utiliserasyncio.sleep()
Sleep
An important use for sleep in asyncio programs is to suspend the current task and allow other coroutines to execute.
It is important because although a task or coroutine can easily schedule new tasks via the create_task() or gather() function, the scheduled tasks will not begin executing until the current task is suspended.
Even sleeping for zero seconds is enough to suspend the current task and give an opportunity to other tasks to run.
For example:
# allow other tasks to run for a moment await asyncio.sleep(0)
Finally, a good use for sleep is to simulate blocking tasks in a concurrent program.
Autres
Ne pas quiter avant la fin du traitement des tâches
async def main(): # Create some tasks. for _ in range(10): asyncio.create_task(asyncio.sleep(10)) # Wait for all other tasks to finish other than the current task i.e. main(). await asyncio.gather(*asyncio.all_tasks() - {asyncio.current_task()})
asyncio & threading
import asyncio import threading async def something_async(): print('something_async start in thread:', threading.current_thread()) await asyncio.sleep(1) print('something_async done in thread:', threading.current_thread()) def main(): t1 = threading.Thread(target=asyncio.run, args=(something_async(), )) t2 = threading.Thread(target=asyncio.run, args=(something_async(), )) t1.start() t2.start() t1.join() t2.join() if __name__ == '__main__': main()
Source : https://stackoverflow.com/questions/57234827/wait-for-async-function-to-complete
Autre
import time from random import randint period = 1 # Second def get_epoch_ms(): return int(time.time() * 1000.0) async def do_something(name): print("Start :", name, get_epoch_ms()) try: # Do something which may takes more than 1 secs. slp = randint(1, 5) print("Sleep :", name, get_epoch_ms(), slp) await asyncio.sleep(slp) except Exception as e: print("Error :", e) print("Finish :", name, get_epoch_ms()) loop = asyncio.get_event_loop() futures = [loop.create_task(do_something('T' + str(i))) for i in range(5)] #loop.run_forever() #for f in futures: # f.cancel() for f in futures: loop.run_until_complete(f)
Source : https://stackoverflow.com/questions/56318648/how-to-run-an-asyncio-task-without-awaiting
Source : nsclient NSCP-0.8.0-x64/scripts/python
badapp.py
#! /usr/bin/env python3 import threading class BadThread(threading.Thread): id = -1 def __init__(self, id): self.id = id threading.Thread.__init__(self) def run(self): i = 0 while(True): i = i + 1 if i > 100000: print('Processing: %d'%self.id) i = 0 for x in range(1000): BadThread(x).start()
