Futures¶
Future objects are used to bridge low-level callback-based code with high-level async/await code.
Future Functions¶
Return True if obj is either of:
a Future-like object with a _asyncio_future_blocking attribute.
New in version 3.5.
obj argument as is, if obj is a Future , a Task , or a Future-like object ( isfuture() is used for the test.)
a Task object wrapping obj, if obj is a coroutine ( iscoroutine() is used for the test); in this case the coroutine will be scheduled by ensure_future() .
a Task object that would await on obj, if obj is an awaitable ( inspect.isawaitable() is used for the test.)
If obj is neither of the above a TypeError is raised.
See also the create_task() function which is the preferred way for creating new Tasks.
Save a reference to the result of this function, to avoid a task disappearing mid-execution.
Changed in version 3.5.1: The function accepts any awaitable object.
Deprecated since version 3.10: Deprecation warning is emitted if obj is not a Future-like object and loop is not specified and there is no running event loop.
Deprecated since version 3.10: Deprecation warning is emitted if future is not a Future-like object and loop is not specified and there is no running event loop.
Future Object¶
A Future represents an eventual result of an asynchronous operation. Not thread-safe.
Future is an awaitable object. Coroutines can await on Future objects until they either have a result or an exception set, or until they are cancelled. A Future can be awaited multiple times and the result is same.
Typically Futures are used to enable low-level callback-based code (e.g. in protocols implemented using asyncio transports ) to interoperate with high-level async/await code.
The rule of thumb is to never expose Future objects in user-facing APIs, and the recommended way to create a Future object is to call loop.create_future() . This way alternative event loop implementations can inject their own optimized implementations of a Future object.
Changed in version 3.7: Added support for the contextvars module.
Deprecated since version 3.10: Deprecation warning is emitted if loop is not specified and there is no running event loop.
Return the result of the Future.
If the Future is done and has a result set by the set_result() method, the result value is returned.
If the Future is done and has an exception set by the set_exception() method, this method raises the exception.
If the Future has been cancelled, this method raises a CancelledError exception.
If the Future’s result isn’t yet available, this method raises a InvalidStateError exception.
Mark the Future as done and set its result.
Raises a InvalidStateError error if the Future is already done.
Mark the Future as done and set an exception.
Raises a InvalidStateError error if the Future is already done.
Return True if the Future is done.
A Future is done if it was cancelled or if it has a result or an exception set with set_result() or set_exception() calls.
Return True if the Future was cancelled.
The method is usually used to check if a Future is not cancelled before setting a result or an exception for it:
Add a callback to be run when the Future is done.
The callback is called with the Future object as its only argument.
If the Future is already done when this method is called, the callback is scheduled with loop.call_soon() .
An optional keyword-only context argument allows specifying a custom contextvars.Context for the callback to run in. The current context is used when no context is provided.
functools.partial() can be used to pass parameters to the callback, e.g.:
Changed in version 3.7: The context keyword-only parameter was added. See PEP 567 for more details.
Remove callback from the callbacks list.
Returns the number of callbacks removed, which is typically 1, unless a callback was added more than once.
cancel ( msg = None ) ¶
Cancel the Future and schedule callbacks.
If the Future is already done or cancelled, return False . Otherwise, change the Future’s state to cancelled, schedule the callbacks, and return True .
Changed in version 3.9: Added the msg parameter.
Return the exception that was set on this Future.
The exception (or None if no exception was set) is returned only if the Future is done.
If the Future has been cancelled, this method raises a CancelledError exception.
If the Future isn’t done yet, this method raises an InvalidStateError exception.
Return the event loop the Future object is bound to.
New in version 3.7.
This example creates a Future object, creates and schedules an asynchronous Task to set result for the Future, and waits until the Future has a result:
AsyncIO для практикующего python-разработчика
Я помню тот момент, когда подумал «Как же медленно всё работает, что если я распараллелю вызовы?», а спустя 3 дня, взглянув на код, ничего не мог понять в жуткой каше из потоков, синхронизаторов и функций обратного вызова.
Тогда я познакомился с asyncio, и всё изменилось.
Если кто не знает, asyncio — новый модуль для организации конкурентного программирования, который появился в Python 3.4. Он предназначен для упрощения использования корутин и футур в асинхронном коде — чтобы код выглядел как синхронный, без коллбэков.
Я помню, в то время было несколько похожих инструментов, и один из них выделялся — это библиотека gevent. Я советую всем прочитать прекрасное руководство gevent для практикующего python-разработчика, в котором описана не только работа с ней, но и что такое конкурентность в общем понимании. Мне настолько понравилось та статья, что я решил использовать её как шаблон для написания введения в asyncio.
Небольшой дисклеймер — это статья не gevent vs asyncio. Nathan Road уже сделал это за меня в своей заметке. Все примеры вы можете найти на GitHub.
Я знаю, вам уже не терпится писать код, но для начала я бы хотел рассмотреть несколько концепций, которые нам пригодятся в дальнейшем.
Потоки, циклы событий, корутины и футуры
Потоки — наиболее распространённый инструмент. Думаю, вы слышали о нём и ранее, однако asyncio оперирует несколько другими понятиями: циклы событий, корутины и футуры.
- цикл событий (event loop) по большей части всего лишь управляет выполнением различных задач: регистрирует поступление и запускает в подходящий момент — специальные функции, похожие на генераторы python, от которых ожидают (await), что они будут отдавать управление обратно в цикл событий. Необходимо, чтобы они были запущены именно через цикл событий — объекты, в которых хранится текущий результат выполнения какой-либо задачи. Это может быть информация о том, что задача ещё не обработана или уже полученный результат; а может быть вообще исключение
Синхронное и асинхронное выполнение
В видео «Конкурентность — это не параллелизм, это лучше» Роб Пайк обращает ваше внимание на ключевую вещь. Разбиение задач на конкурентные подзадачи возможно только при таком параллелизме, когда он же и управляет этими подзадачами.
Asyncio делает тоже самое — вы можете разбивать ваш код на процедуры, которые определять как корутины, что даёт возможность управлять ими как пожелаете, включая и одновременное выполнение. Корутины содержат операторы yield, с помощью которых мы определяем места, где можно переключиться на другие ожидающие выполнения задачи.
За переключение контекста в asyncio отвечает yield, который передаёт управление обратно в event loop, а тот в свою очередь — к другой корутине. Рассмотрим базовый пример:
* Сначала мы объявили пару простейших корутин, которые притворяются неблокирующими, используя sleep из asyncio
* Корутины могут быть запущены только из другой корутины, или обёрнуты в задачу с помощью create_task
* После того, как у нас оказались 2 задачи, объединим их, используя wait
* И, наконец, отправим на выполнение в цикл событий через run_until_complete
Используя await в какой-либо корутине, мы таким образом объявляем, что корутина может отдавать управление обратно в event loop, который, в свою очередь, запустит какую-либо следующую задачу: bar. В bar произойдёт тоже самое: на await asyncio.sleep управление будет передано обратно в цикл событий, который в нужное время вернётся к выполнению foo.
Представим 2 блокирующие задачи: gr1 и gr2, как будто они обращаются к неким сторонним сервисам, и, пока они ждут ответа, третья функция может работать асинхронно.
Обратите внимание как происходит работа с вводом-выводом и планированием выполнения, позволяя всё это уместить в один поток. Пока две задачи заблокированы ожиданием I/O, третья функция может занимать всё процессорное время.
Порядок выполнения
В синхронном мире мы мыслим последовательно. Если у нас есть список задач, выполнение которых занимает разное время, то они завершатся в том же порядке, в котором поступили в обработку. Однако, в случае конкурентности нельзя быть в этом уверенным.
Разумеется, ваш результат будет иным, поскольку каждая задача будет засыпать на случайное время, но заметьте, что результат выполнения полностью отличается, хотя мы всегда ставим задачи в одном и том же порядке.
Также обратите внимание на корутину для нашей довольно простой задачи. Это важно для понимания, что в asyncio нет никакой магии при реализации неблокирующих задач. Во время реализации asyncio стоял отдельно в стандартной библиотеке, т.к. остальные модули предоставляли только блокирующую функциональность. Вы можете использовать модуль concurrent.futures для оборачивания блокирующих задач в потоки или процессы и получения футуры для использования в asyncio. Несколько таких примеров доступны на GitHub.
Это, наверно, главный недостаток сейчас при использовании asyncio, однако уже есть несколько библиотек, помогающих решить эту проблему.
Самая популярная блокирующая задача — получение данных по HTTP-запросу. Рассмотрим работу с великолепной библиотекой aiohttp на примере получения информации о публичных событиях на GitHub.
Тут стоит обратить внимание на пару моментов.
Во-первых, разница во времени — при использовании асинхронных вызовов мы запускаем запросы одновременно. Как говорилось ранее, каждый из них передавал управление следующему и возвращал результат по завершении. То есть скорость выполнения напрямую зависит от времени работы самого медленного запроса, который занял как раз 0.54 секунды. Круто, правда?
Во-вторых, насколько код похож на синхронный. Это же по сути одно и то же! Основные отличия связаны с реализацией библиотеки для выполнения запросов, созданием и ожиданием завершения задач.
Создание конкурентности
До сих пор мы использовали единственный метод создания и получения результатов из корутин, создания набора задач и ожидания их завершения. Однако, корутины могут быть запланированы для запуска и получения результатов несколькими способами. Представьте ситуацию, когда нам надо обрабатывать результаты GET-запросов по мере их получения; на самом деле реализация очень похожа на предыдущую:
Посмотрите на отступы и тайминги — мы запустили все задачи одновременно, однако они обработаны в порядке завершения выполнения. Код в данном случае немного отличается: мы пакуем корутины, каждая из которых уже подготовлена для выполнения, в список. Функция as_completed возвращает итератор, который выдаёт результаты корутин по мере их выполнения. Круто же, правда?! Кстати, и as_completed, и wait — функции из пакета concurrent.futures.
Ещё один пример — что если вы хотите узнать свой IP адрес. Есть куча сервисов для этого, но вы не знаете какой из них будет доступен в момент работы программы. Вместо того, чтобы последовательно опрашивать каждый из списка, можно запустить все запросы конкурентно и выбрать первый успешный.
Что ж, для этого в нашей любимой функции wait есть специальный параметр return_when. До сих пор мы игнорировали то, что возвращает wait, т.к. только распараллеливали задачи. Но теперь нам надо получить результат из корутины, так что будем использовать набор футур done и pending.
Что же случилось? Первый сервис ответил успешно, но в логах какое-то предупреждение!
На самом деле мы запустили выполнение двух задач, но вышли из цикла уже после первого результата, в то время как вторая корутина ещё выполнялась. Asyncio подумал что это баг и предупредил нас. Наверно, стоит прибираться за собой и явно убивать ненужные задачи. Как? Рад, что вы спросили.
Состояния футур
- ожидание (pending)
- выполнение (running)
- выполнено (done)
- отменено (cancelled)
Вы можете узнать состояние футуры с помощью методов done, cancelled или running, но не забывайте, что в случае done вызов result может вернуть как ожидаемый результат, так и исключение, которое возникло в процессе работы. Для отмены выполнения футуры есть метод cancel. Это подходит для исправления нашего примера.
Простой и аккуратный вывод — как раз то, что я люблю!
Если вам нужна некоторая дополнительная логика по обработке футур, то вы можете подключать коллбэки, которые будут вызваны при переходе в состояние done. Это может быть полезно для тестов, когда некоторые результаты надо переопределить какими-то своими значениями.
Обработка исключений
asyncio — это целиком про написание управляемого и читаемого конкурентного кода, что хорошо заметно при обработке исключений. Вернёмся к примеру, чтобы продемонстрировать.
Допустим, мы хотим убедиться, что все запросы к сервисам по определению IP вернули одинаковый результат. Однако, один из них может быть оффлайн и не ответить нам. Просто применим try. except как обычно:
Мы также можем обработать исключение, которое возникло в процессе выполнения корутины:
Точно также, как и запуск задачи без ожидания её завершения является ошибкой, так и получение неизвестных исключений оставляет свои следы в выводе:
Вывод выглядит также, как и в предыдущем примере за исключением укоризненного сообщения от asyncio.
Таймауты
А что, если информация о нашем IP не так уж важна? Это может быть хорошим дополнением к какому-то составному ответу, в котором эта часть будет опциональна. В таком случае не будем заставлять пользователя ждать. В идеале мы бы ставили таймаут на вычисление IP, после которого в любом случае отдавали ответ пользователю, даже без этой информации.
И снова у wait есть подходящий аргумент:
Я также добавил аргумент timeout к строке запуска скрипта, чтобы проверить что же произойдёт, если запросы успеют обработаться. Также я добавил случайные задержки, чтобы скрипт не завершался слишком быстро, и было время разобраться как именно он работает.
Заключение
Asyncio укрепил мою и так уже большую любовь к python. Если честно, я влюбился в сопрограммы, ещё когда познакомился с ними в Tornado, но asyncio сумел взять всё лучшее из него и других библиотек по реализации конкурентности. Причём настолько, что были предприняты особые усилия, чтобы они могли использовать основной цикл ввода-вывода. Так что если вы используете Tornado или Twisted, то можете подключать код, предназначенный для asyncio!
Как я уже упоминал, основная проблема заключается в том, что стандартные библиотеки пока ещё не поддерживают неблокирующее поведение. Также и многие популярные библиотеки работают пока лишь в синхронном стиле, а те, что используют конкурентность, пока ещё молоды и экспериментальны. Однако, их число растёт.
Надеюсь, в этом уроке я показал, насколько приятно работать с asyncio, и эта технология подтолкнёт вас к переходу на python 3, если вы по какой-то причине застряли на python 2.7. Одно точно — будущее Python полностью изменилось.
concurrent.futures — Запуск параллельных задач¶
Модуль concurrent.futures предоставляет высокоуровневый интерфейс для асинхронного выполнения вызовов.
Асинхронное выполнение можно выполняться потоками, используя ThreadPoolExecutor или отдельными процессами, используя ProcessPoolExecutor . Оба реализуют один и тот же интерфейс, определяемый абстрактным классом Executor .
Объекты Executor¶
Абстрактный класс, предоставляющий методы для асинхронного выполнения вызовов. Его следует использовать не напрямую, а через его подклассы.
submit ( fn, *args, **kwargs ) ¶
Планирование выполнения вызываемого fn как fn(*args **kwargs) и возвращает объект Future , представляющего выполнение вызываемого:
- iterables собираются немедленно, а не лениво;
- func выполняется асинхронно, и несколько вызовов func могут выполняться одновременно.
Если func вызов вызывает исключение, то это исключение возникает при получении его значения из итератора.
При использовании ProcessPoolExecutor , метод перемалывает iterables в чанки, которые он передаёт в пул как отдельные задачи. (Приблизительный) размер этих чанков можно задать, установив для параметра chunksize положительное целое число. Для очень длинных итераторов использование большого значения для chunksize может значительно повысить производительность по сравнению с размером по умолчанию 1. С ThreadPoolExecutor chunksize не имеет эффекта.
Изменено в версии 3.5: Добавлен аргумент chunksize.
shutdown ( wait=True ) ¶
Сообщает исполнителю, что он должен освободить ресурсы, которые он использует при выполнении текущей ожидающих футуры. Вызовы Executor.submit() и Executor.map() , сделанные после завершения работы, вызовут RuntimeError .
Если wait будет True тогда, то этот метод не возвращает пока все ожидающие футуры не выполнятся и ресурсы, связанные с исполнителем, не будут освобождены. Если wait будет False тогда, то этот метод немедленно возвращает и ресурсы, связанные с исполнителем, будут освобождены, когда все ожидающие футуры завершат выполнение. Независимо от значения wait, вся программа Python не завершит работу, пока не будут выполнены все ожидающие футуры.
Можно избежать необходимости явного вызова этого метода, если используется оператор with , которая отключает Executor (ожидание, как если бы Executor.shutdown() вызывались с wait, установленным в True ):
ThreadPoolExecutor¶
ThreadPoolExecutor — это подкласс Executor , использующий пул потоков для асинхронного выполнения вызовов. Могут возникать взаимоблокировки, когда связанный с Future вызываемый объект, ожидает результатов другого Future . Например:
Подкласс Executor , использующий пул из не более max_workers потоков для асинхронного выполнения вызовов.
initializer является необязательным вызываемым элементом, вызываемым в начале каждого рабочего потока; initargs является кортежем аргументов, переданных инициализатору. Если initializer создаёт исключение, все текущие ожидающие задания будут создавать BrokenThreadPool , а также любые попытки отправить больше заданий в пул.
Изменено в версии 3.5: Если max_workers значение None или не указано, то по умолчанию будет установлено количество процессоров на машине, умноженное на 5 , при условии, что ThreadPoolExecutor часто используется перекрывая I/O вместо работы CPU и количество воркеров должно быть выше числа воркеров для ProcessPoolExecutor .
Добавлено в версии 3.6: Добавлен аргумент thread_name_prefix, позволяющий пользователям управлять именами threading.Thread для рабочих потоков, созданных пулом для упрощения отладки.
Изменено в версии 3.7: Добавлены аргументы initializer и initargs.
Изменено в версии 3.8: Значение по умолчанию max_workers изменяется на min(32, os.cpu_count() + 4) . Это значение по умолчанию сохраняет не менее 5 воркеров для I/O связанных задач. Он использует не более 32 ядер CPU для задач, связанных с CPU, которые освобождают GIL. И это позволяет избежать неявного использования очень больших ресурсов на многоядерных машинах.
ThreadPoolExecutor теперь повторно использует неактивные рабочие потоки перед запуском max_workers рабочих потоков.
Пример ThreadPoolExecutor¶
ProcessPoolExecutor¶
Класс ProcessPoolExecutor — это подкласс Executor , использующий пул процессов для асинхронного выполнения вызовов. ProcessPoolExecutor использует модуль multiprocessing , позволяющий ему выполнять обходные шаги глобальной блокировки интерпретатора , но также означает, что могут быть выполнены и возвращены только picklable объекты.
Модуль __main__ должен быть импортирован подпроцессами воркера. Это значит, что ProcessPoolExecutor не будет работать в интерактивном интерпретатор.
Вызов Executor или Future метода из вызываемого, переданного ProcessPoolExecutor , приведет к взаимоблокировке.
class concurrent.futures. ProcessPoolExecutor ( max_workers=None, mp_context=None, initializer=None, initargs=() ) ¶
Подкласс Executor , выполняющий вызовы асинхронно, используя пул не более max_workers процессов. Если max_workers — None или не задано, по умолчанию будет использоваться количество процессоров на машине. Если max_workers меньше или равно 0 , то будет вызвано ValueError . В Windows max_workers должен быть меньше или равен 61 . Если это не так, то будет вызвано ValueError . Если max_workers — это None , то по умолчанию будет выбрано самое большее 61 , даже если доступно больше процессоров. mp_context может быть контекстом многопроцессорности или None. Он будет использоваться для запуска рабочих. Если mp_context равен None или не задан, используется контекст многопроцессорной обработки по умолчанию.
initializer — необязательный вызываемый объект, вызываемый в начале каждого рабочего процесса; initargs — кортеж аргументов, переданных инициализатору. Если initializer вызовет исключение, все текущие ожидающие задания вызовут BrokenProcessPool , а также любые попытки отправить больше заданий в пул.
Изменено в версии 3.3: При резком завершении одного из рабочих процессов возникает ошибка BrokenProcessPool . Ранее поведение не определялось, но операции с исполнителем или его футуры часто замораживалась или дедлочилась.
Изменено в версии 3.7: Добавлен аргумент mp_context, позволяющий пользователям управлять start_method для рабочих процессов, созданных пулом.
Добавлены аргументы initializer и initargs.
Пример ProcessPoolExecutor¶
Объекты Future¶
Модуль Future класс инкапсулирует асинхронное выполнение вызываемого объекта. Future сущности создаются Executor.submit() .
class concurrent.futures. Future ¶
Инкапсулирует асинхронное выполнение вызываемого объекта. Future сущности создаются Executor.submit() и не должны создаваться непосредственно, кроме тестирования.
cancel ( ) ¶
Попытка отменить вызов. Если вызов выполняется или завершен и не может быть отменен, метод возвращает False , в противном случае вызов отменяется, а метод возвращает True .
cancelled ( ) ¶
Если вызов был успешно отменен, возвращает значение True.
running ( ) ¶
Возвращает значение True , если вызов выполняется и не может быть отменен.
done ( ) ¶
Если вызов был успешно отменён или завершён, вернёт значение True.
result ( timeout=None ) ¶
Возвращает значение, возвращенное вызовом. Если вызов еще не завершен, этот метод будет ждать до timeout секунд. Если вызов не был завершен в течение timeout секунд, то возникает concurrent.futures.TimeoutError . timeout может быть int или float. Если timeout не указан или отсутствует, время ожидания не ограничено.
Если футура будет отменена до завершения, то будет вызвано CancelledError .
В случае вызова это метод вызовет такое же исключение.
exception ( timeout=None ) ¶
Возвращает исключение, вызванное вызовом. Если вызов ещё не завершён, этот метод будет ждать до timeout секунд. Если вызов не был завершен в течение timeout секунд, то возникает concurrent.futures.TimeoutError . timeout может быть int или float. Если timeout не указан или отсутствует, время ожидания не ограничено.
Если футура будет отменено до завершения, то будет вызвано CancelledError .
Если вызов завершён без вызова, возвращается «Нет».
add_done_callback ( fn ) ¶
Прикрепляет вызываемый fn к футуре. fn будет вызван с футурой в качестве единственного аргумента, когда футура будет отменена или закончена.
Добавленные вызываемые объекты вызываются в том порядке, в котором они были добавлены, и всегда вызываются в потоке, принадлежащем процессу, их добавившему. Если вызываемый объект вызывает подкласс Exception , он регистрируется и игнорируется. Если вызываемый объект вызывает подкласс BaseException , поведение не определено.
Если футура уже завершилась или отменена, будет немедленно вызвана fn.
Следующие Future методы предназначается для использования в юнит тестах и реализаций Executor .
set_running_or_notify_cancel ( ) ¶
Метод должен вызываться только реализациями Executor перед выполнением работы, связанной с Future и юнит тестами.
Если метод возвращает False тогда, Future был отменен, т.е. Future.cancel() вызван и возвращён True . Все потоки, ожидающие завершения Future (т.е. через as_completed() или wait() ), будут пробуждены.
Если метод возвращает True , то Future не был отменён и переведён в рабочее состояние, т.е. вызовы Future.running() вернут True .
Метод может вызываться только один раз и не может быть вызван после вызова Future.set_result() или Future.set_exception() .
set_result ( result ) ¶
Задаёт для результата работы, связанной с Future , значение result.
Данный метод должен использоваться только Executor реализациями и юнит тестами.
Изменено в версии 3.8: Метод вызывает concurrent.futures.InvalidStateError , если Future уже выполнена.
set_exception ( exception ) ¶
Задаёт для результата работы, связанной с Future , значение Exception исключения.
Метод должен быть использован только Executor реализациями и юнит тестами.
Изменено в версии 3.8: Данный метод вызывает concurrent.futures.InvalidStateError , если Future уже выполнилась.
Функции модуля¶
Дожидается завершения Future сущности (возможно, созданного различными экземплярами Executor ), заданного fs. Возвращает именованный 2-кортеж множеств. Первое множество, названное выполненное , содержит футуры, которые завершились (завершенные или отменённые футуры), прежде чем ожидание завершилось. Второе множество , названное не_выполненные , содержит футуры, которые не завершены (отложенные или запущенные футуры).
timeout используется для управления максимальным количеством секунд ожидания перед возвращением. timeout может быть int или float. Если timeout не указан или отсутствует, время ожидания не ограничено.
return_when указывает, когда функция должна вернуться. Она должна быть одной из следующих констант:
| Константа | Описание |
|---|---|
| FIRST_COMPLETED | Функция возвращает, когда любая футура заканчивается или будет отменена. |
| FIRST_EXCEPTION | Функция возвращает, когда футура завершается вызвав исключение. Если в футуре нет исключения, оно эквивалентно ALL_COMPLETED . |
| ALL_COMPLETED | Функция возвращает, когда все футуры завершены или будут отменены. |
concurrent.futures. as_completed ( fs, timeout=None ) ¶
Возвращает итератор по Future сущности (возможно, созданный различными экземплярами Executor ), заданному fs, который возвращает футуры по мере завершения (завершённые или аннулированные футуры). Все дублируемые футуры fs возвращаются один раз. Все футуры, завершенные до вызова as_completed() , будут получены первыми. Возвращенный итератор вызывает concurrent.futures.TimeoutError , если __next__() вызывается и результат недоступен через timeout секунд после исходного вызова as_completed() . timeout может быть int или float. Если timeout не указан или отсутствует, время ожидания не ограничено.
PEP 3148 — Футуры. Выполнение асинхронных вычислений. Предложение, определяющее эту функцию для включения в стандартную библиотеку Python.
Классы исключений¶
Возникает при отмене футуры.
exception concurrent.futures. TimeoutError ¶
Возникает, когда операция футуры превышает заданное время ожидания.
exception concurrent.futures. BrokenExecutor ¶
Полученный из RuntimeError , это исключение вызывает класс, когда исполнитель сломан по некоторым причинам и не может быть использован, чтобы представить или выполнить новые задачи.
Добавлено в версии 3.7.
Возникает при выполнении операции над футурой, которая не разрешена в текущем состояние.
Добавлено в версии 3.8.
Полученный из BrokenExecutor , это исключение вызывает класс, когда один из воркеров ThreadPoolExecutor провалил инициализацию.
Добавлено в версии 3.7.
Производное от BrokenExecutor (ранее RuntimeError ), это исключение класса возникает, когда один из воркеров ProcessPoolExecutor прекратил работу неочищенным способом (например, если он был убит снаружи).
Асинхронный Python: различные формы конкурентности
Это перевод статьи Абу Ашраф Маснуна «Async Python: The Different Forms of Concurrency».
С появлением Python 3 довольно много шума об «асинхронности» и «параллелизме», можно полагать, что Python недавно представил эти возможности/концепции. Но это не так. Мы много раз использовали эти операции. Кроме того, новички могут подумать, что asyncio является единственным или лучшим способом воссоздать и использовать асинхронные/параллельные операции. В этой статье мы рассмотрим различные способы достижения параллелизма, их преимущества и недостатки.
Определение терминов:
Прежде чем мы углубимся в технические аспекты, важно иметь некоторое базовое понимание терминов, часто используемых в этом контексте.
Синхронный и асинхронный:
В синхронных операциях задачи выполняются друг за другом. В асинхронных — задачи могут запускаться и завершаться независимо друг от друга. Одна асинхронная задача может запускаться и продолжать выполняться, пока выполнение переходит к новой задаче. Асинхронные задачи не блокируют (не заставляют ждать завершения выполнения задачи) операции и обычно выполняются в фоновом режиме.
Например, вы должны обратиться в туристическое агентство, чтобы спланировать свой следующий отпуск. Вам нужно отправить письмо своему руководителю, прежде чем улететь. В синхронном режиме, вы сначала позвоните в туристическое агентство, и если вас попросят подождать, то вы будете ждать, пока вам не ответят. Затем вы начнёте писать письмо руководителю. Таким образом, вы выполняете задачи последовательно, одна за другой.
Но, если вы умны, то пока вас попросили подождать, вы начнёте писать письмо, и когда с вами снова заговорят, вы приостановите написание, поговорите, а затем допишете письмо. Вы также можете попросить друга позвонить в агентство, а сами написать письмо. Это асинхронность, задачи не блокируют друг друга.
Конкурентность и параллелизм:
Конкурентность подразумевает, что две задачи выполняются совместно. В нашем предыдущем примере, когда мы рассматривали асинхронный пример, мы постепенно продвигались то в написании письма, то в разговоре с турагентством. Это конкурентность.
Когда мы попросили позвонить друга, а сами писали письмо, то задачи выполнялись параллельно.
Параллелизм по сути является формой конкурентности. Но параллелизм зависит от оборудования. Например, если в CPU только одно ядро, то две задачи не могут выполняться параллельно. Они просто делят процессорное время между собой. Тогда это конкурентность, но не параллелизм. Но когда у нас есть несколько ядер, мы можем выполнять несколько операций (в зависимости от количества ядер) одновременно.
- Синхронность: блокирует операции (блокирующие)
- Асинхронность: не блокирует операции (неблокирующие)
- Конкурентность: совместный прогресс (совместные)
- Параллелизм: параллельный прогресс (параллельные)
Параллелизм подразумевает конкурентность. Но конкурентность не всегда подразумевает параллелизм.
Потоки и процессы
Python поддерживает потоки уже очень давно. Потоки позволяют выполнять операции конкурентно. Но есть проблема, связанная с Global Interpreter Lock (GIL) из-за которой потоки не могли обеспечить настоящий параллелизм. И тем не менее, с появлением multiprocessing можно использовать несколько ядер с помощью Python.
Потоки (Threads)
Рассмотрим небольшой пример. В нижеследующем коде функция worker будет выполняться в нескольких потоках асинхронно и одновременно.
А вот пример выходных данных:
Таким образом мы запустили 5 потоков для совместной работы, и после их старта (т.е. после запуска функции worker) операция не ждёт завершения работы потоков прежде чем перейти к следующему оператору print. Это асинхронная операция.
В нашем примере мы передали функцию в конструктор Thread. Если бы мы хотели, то могли бы реализовать подкласс с методом (ООП стиль).
Global Interpreter Lock (GIL)
GIL нужен, чтобы сделать обработку памяти CPython проще и обеспечить наилучшую интеграцию с C.
GIL — это механизм блокировки, когда интерпретатор Python запускает в работу только один поток за раз. Это значит, только один поток может исполняться в байт-коде Python единовременно. GIL следит за тем, чтобы несколько потоков не выполнялись параллельно.
Краткие сведения о GIL:
- Одновременно может выполняться один поток.
- Интерпретатор Python переключается между потоками для достижения конкурентности.
- GIL применим к CPython (стандартной реализации). Но, например, Jython и IronPython не имеют GIL.
- GIL делает однопоточные программы быстрыми.
- Операциям ввода/вывода GIL обычно не мешает.
- GIL позволяет легко интегрировать непотокобезопасные библиотеки на C, благодаря GIL у нас есть много высокопроизводительных расширений/модулей, написанных на C.
- Для CPU-зависимых задач интерпретатор делает проверку каждые N тиков и переключает потоки. Таким образом один поток не блокирует другие.
Многие видят в GIL слабость. Я же рассматриваю это как благо, ведь были созданы такие библиотеки, как NumPy, SciPy, которые занимают особое, уникальное положение в научном обществе.
Процессы (Processes)
Чтобы достичь параллелизма, в Python был добавлен модуль multiprocessing, который предоставляет API и выглядит очень похожим, если вы использовали threading раньше.
Давайте просто пойдем и изменим предыдущий пример. Теперь модифицированная версия использует Процесс вместо Потока.
Что же изменилось? Я просто импортировал модуль multiprocessing вместо threading. А затем, вместо потока я использовал процесс. Вот и всё! Теперь вместо множества потоков мы используем процессы, которые запускаются на разных ядрах CPU (если, конечно, у вашего процессора несколько ядер).
С помощью класса Pool мы также можем распределить выполнение одной функции между несколькими процессами для разных входных значений.
Пример из официальных документов:
Здесь вместо того, чтобы перебирать список значений и вызывать функцию f по одному, мы фактически запускаем функцию в разных процессах.
Один процесс выполняет f(1), другой-f(2), а другой-f (3). Наконец, результаты снова объединяются в список. Это позволяет нам разбить тяжелые вычисления на более мелкие части и запускать их параллельно для более быстрого расчета.
Модуль concurrent.futures
Модуль concurrent.futures большой и позволяет писать асинхронный код очень легко. Мои любимчики — ThreadPoolExecutor и ProcessPoolExecutor. Эти исполнители поддерживают пул потоков или процессов. Мы отправляем наши задачи в пул, и он запускает задачи в доступном потоке / процессе. Возвращается объект Future, который можно использовать для запроса и получения результата по завершении задачи.
А вот пример ThreadPoolExecutor:
Asyncio — что, как и почему
У вас, вероятно, есть вопрос, который есть у многих людей в сообществе Python — что asyncio приносит нового? Зачем нужен был еще один способ асинхронного ввода-вывода? Разве у нас уже не было потоков и процессов?
Зачем нам нужен asyncio?
Процессы очень дорогостоящие и требуют много ресурсов для создания. Поэтому для операций ввода/вывода в основном выбираются потоки.
Мы знаем, что ввод-вывод зависит от внешних вещей — медленные диски или неприятные сетевые лаги делают ввод-вывод часто непредсказуемым. Теперь предположим, что мы используем потоки для операций ввода-вывода. 3 потока выполняют различные задачи ввода-вывода. Интерпретатор должен был бы переключаться между конкурентными потоками и давать каждому из них некоторое время по очереди.
Назовем потоки — T1, T2 и T3. Три потока начали свою операцию ввода-вывода. T3 завершает его первым. T2 и T1 все еще ожидают ввода-вывода. Интерпретатор Python переключается на T1, но он все еще ждет. Хорошо, интерпретатор перемещается в T2, а тот все еще ждет, а затем перемещается в T3, который готов и выполняет код. Вы видите в этом проблему?
T3 был готов, но интерпретатор сначала переключился между T2 и T1 — это понесло расходы на переключение, которых мы могли бы избежать, если бы интерпретатор сначала переключился на T3, верно?
Что есть asynio?
Asyncio предоставляет нам цикл событий наряду с другими крутыми вещами. Цикл событий (event loop) отслеживает события ввода/вывода и переключает задачи, которые готовы и ждут операции ввода/вывода.
Идея очень проста. Есть цикл обработки событий. И у нас есть функции, которые выполняют асинхронные операции ввода-вывода. Мы передаем свои функции циклу событий и просим его запустить их. Цикл событий возвращает нам объект Future, словно обещание, что в будущем мы что-то получим. Мы держимся за обещание, время от времени проверяем, имеет ли оно значение, и, наконец, когда значение получено, мы используем его в некоторых других операциях.
Как использовать asyncio?
Прежде чем мы начнём, давайте взглянем на пример:
Обратите внимание, что синтаксис async/await предназначен только для Python 3.5 и выше. Пройдёмся по коду:
- У нас есть асинхронная функция display_date, которая принимает число (в качестве идентификатора) и цикл обработки событий в качестве параметров.
- Функция имеет бесконечный цикл, который прерывается через 50 секунд. Но за этот период она неоднократно печатает время и делает паузу. Функция await может ожидать завершения выполнения других асинхронных функций (корутин).
- Передаем функцию в цикл обработки событий (используя метод ensure_future).
- Запускаем цикл событий.
Всякий раз, когда происходит вызов await, asyncio понимает, что функции, вероятно, потребуется некоторое время. Таким образом, он приостанавливает выполнение, начинает мониторинг любого связанного с ним события ввода-вывода и позволяет запускать задачи. Когда asyncio замечает, что приостановленный ввод-вывод функции готов, он возобновляет функцию.
Делаем правильный выбор
Только что мы прошлись по самым популярным формам конкурентности. Но остаётся вопрос — что следует выбрать?
Это зависит от вариантов использования. Из моего опыта я склонен следовать этому псевдо-коду:
Такие сложные материи, как асинхронность, мы проходим на обучении Рython