- Регистрация
- 14.05.16
- Сообщения
- 11.398
- Реакции
- 501
- Репутация
- 0
В этой статье я покажу как решить одну из проблем, возникающих при использовании распределенных очередей задач — регулирование пропускной способности очереди, или же, более простым языком, настройка ее rate limit'a. В качестве примера я возьму python и свою любимую связку Celery+RabbitMQ, хотя алгоритм, который я использую, никак не зависит от этих инструментов и может быть реализован на любом другом стэке.
So what's the problem?
Для начала пара слов о том, какую проблему я вообще пытаюсь решить. Дело в том, что 99.9% сервисов в интернете запрещают бесконтрольно закидывать их сотнями/тысячами запросов в секунду, угрожая дать в ответ какой-нибудь 403 или 500. Нет, ну правда, жалко им чтоле? Иногда таким сервисом может выступать даже своя собственная БД… Вобщем, доверять нынче нельзя никому, поэтому приходится себя как-то сдерживать.
Конечно, если вся работа ведется внутри 1го процесса, то никакой проблемы нет, но т.к мы работаем с Celery, то у нас может быть не только N процессов (далее воркеров), но и M машин, и задача все это дело синхронизировать уже не кажется столь тривиальной.
What's in the box
Первое, на что натыкаешься, когда ищешь, как же настроить throttling в celery, это встроенный параметр rate_limit класса Task. Звучит как то, что надо, но, копнув чуть глубже, замечаем, что:
# представим что у нас лимит на вызовы API гитхаба 60 req/min
# придется поделить вызовы поровну
@app.task(rate_limit='30/m')
def get_github_api1():
...
@app.task(rate_limit='30/m')
def get_github_api2():
...
Конечно, можно еще раз поделить лимит, теперь взяв в расчет еще и количество воркеров. Но все это начнет работать дико неэффективно, если таски будут прилетать неравномерно, например в какую-то минуту мы получим 60 вызовов get_github_api1() и 0 вызовов get_github_api2() — будут выполнены только 30 вызовов первого типа, хотя могли бы быть все 60. К тому же каждый раз, как появится новая таска, которой нужен доступ к этому ресурсу, придется снова везде пересчитывать все лимиты. Вобщем фича конечно полезная, но только для самых простых вариантов.
Bringing decision
Token Bucket
Решением проблемы для меня стал
В нашем же случае вместо пакета данных мы имеем таску, а хранилищем токенов будут выступать очереди RabbitMQ.
Wrting some code
Чтож, приступим к написанию кода. Создадим файл main.py и зададим базовые настройки:
from celery import Celery
from kombu import Queue
app = Celery('Test app', broker='amqp://guest@localhost//')
# 1 очередь под сами таски и 1 очередь под токены для них
app.conf.task_queues = [
Queue('github'),
# я ограничил длину очереди до 2ух, чтобы токены не скапливались
# иначе это может привести к пробою нашего rate limit'a
Queue('github_tokens', max_length=2)
]
# это таска будет играть роль нашего токена
# она никогда не будет запущена, мы просто будем забирать ее как сообщение из очереди
@app.task
def token():
return 1
# настраиваем постоянный выпуск нашего токена
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# мы будем выпускать по 1му токену в секунду
# это значит что rate limit для очереди github - 60 задач в минуту
sender.add_periodic_task(1.0, token.signature(queue='github_tokens'))
Не забудьте развернуть Rabbit, я предпочитаю делать это 1ой строчкой докера:
docker run -d --rm --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management
Теперь запустим celery beat — это специальный воркер celery, запускаемый всегда в единственном экземпляре и отвечающий за запуск периодических задач.
celery -A main beat --loglevel=info
После этого в консоли раз в секунду начнут появляться сообщения:
[2020-03-22 22:49:00,992: INFO/MainProcess] Scheduler: Sending due task main.token() (main.token)
Отлично, мы наладили выпуск токенов для нашего 'ведра'. Осталось только научить наших воркеров из него брать. Попробуем оптимизировать код, который мы написали ранее для запросов в github. Добавим эти строчки к main.py:
# Напишем функцию для взятия токена из очереди
def rate_limit(task, task_group):
# берем соединение с брокером из пула
with task.app.connection_for_read() as conn:
# забираем токен
msg = conn.default_channel.basic_get(task_group+'_tokens', no_ack=True)
# получили None - очередь пуста, токенов нет
if msg is None:
# повторить таску через 1 сек
task.retry(countdown=1)
# Добавим print в таски для логирования
# Здесь я поставил max_retries=None, так что таски будут
# повторяться, пока не будут выполнены
@app.task(bind=True)
def get_github_api1(self, max_retries=None):
rate_limit(self, 'github')
print ('Called Api 1')
@app.task(bind=True)
def get_github_api2(self, max_retries=None):
rate_limit(self, 'github')
print ('Called Api 2')
А теперь проверим, как это все работает. В дополнение к уже запущенному beat добавим 8 воркеров:
celery -A main worker -с 8 -Q github
И создадим отдельный маленький скрипт для запуска этих задач, назовем его producer.py:
from main import get_github_api1, get_github_api2
tasks = [get_github_api1, get_github_api2]
for i in range(100):
# запускаю таски в перемешку
tasks[i % 2].apply_async(queue='github')
Запускаем — python producer.py, и смотрим в логи воркеров:
[2020-03-23 13:04:15,017: WARNING/ForkPoolWorker-3] Called Api 2
[2020-03-23 13:04:16,053: WARNING/ForkPoolWorker-8] Called Api 2
[2020-03-23 13:04:17,112: WARNING/ForkPoolWorker-1] Called Api 2
[2020-03-23 13:04:18,187: WARNING/ForkPoolWorker-1] Called Api 1
... (96 more lines)
Несмотря на то, что у нас целых 8 рабочих процессов, таски выполняются примерно раз в секунду, отправляясь в конец очереди, если на момент их выполнения не оказалось токена. Также, я думаю, вы уже заметили, что на самом деле мы накладываем rate limit не совсем на очередь, а скорее на какую-то логически связанную группу задач, которые на самом деле могут находится как в разных очередях, так и в одной. Таким образом наш контроль становится даже более детальным и гранулированным.
Putting it all together
Конечно, количество таких групп задач не ограничено (разве что возможностями брокера). Соберем весь код в кучку, расширим и причешим его:
from celery import Celery
from kombu import Queue
from queue import Empty
from functools import wraps
app = Celery('hello', broker='amqp://guest@localhost//')
task_queues = [
Queue('github'),
Queue('google')
]
# количество запусков в минуту
rate_limits = {
'github': 60,
'google': 100
}
# автоматически сгенерируем очереди с токенами под все группы, на которые нужен лимит
task_queues += [Queue(name+'_tokens', max_length=2) for name, limit in rate_limits.items()]
app.conf.task_queues = task_queues
@app.task
def token():
return 1
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# автоматически настроим выпуск токенов с нужной скоростью
for name, limit in rate_limits.items():
sender.add_periodic_task(60 / limit, token.signature(queue=name+'_tokens'))
# Как можно не любить декораторы?
def rate_limit(task_group):
def decorator_func(func):
@wraps(func)
def function(self, *args, **kwargs):
with self.app.connection_for_read() as conn:
# тут я для примера использовал другой более высокоуровневый подход:
# в замен на получение полноценного интерфейса очереди
# мы немного теряем в перфомансе, т.к под капотом происходит обмен
# несколькими сообщениями с брокером
with conn.SimpleQueue(task_group+'_tokens', no_ack=True, queue_opts={'max_length':2}) as queue:
try:
# из плюсов также - наличие вот такого блокирующего вызова
# это может быть удобнее, чем постоянная ротация с retry()
# впрочем, это нужно подбирать под кейс
queue.get(block=True, timeout=5)
return func(self, *args, **kwargs)
except Empty:
self.retry(countdown=1)
return function
return decorator_func
# с декораторами все-таки намного красивее и читабельнее, согласитесь?
@app.task(bind=True, max_retries=None)
@rate_limit('github')
def get_github_api1(self):
print ('Called github Api 1')
@app.task(bind=True, max_retries=None)
@rate_limit('github')
def get_github_api2(self):
print ('Called github Api 2')
@app.task(bind=True, max_retries=None)
@rate_limit('google')
def query_google_api1(self):
print ('Called Google Api 1')
@app.task(bind=True, max_retries=None)
@rate_limit('google')
def query_google_api1(self):
print ('Called Google Api 2')
Таким образом суммарные вызовы задач группы google не превысят 100/мин, а группы github — 60/мин. Заметьте, что для того, чтобы настроить такой throttling, понадобилось меньше 50 строк. Как по мне, достаточно просто.
Moving further
Ну, вот все и работает как надо, причем без каких-либо сторонних примочек, средствами только самого брокера. Но зачем останавливаться на достигнутом ? Грамотно используя данный алгоритм, можно пойти дальше и создать намного более сложные и гибкие стратегии. Например, некоторые таски могут брать не 1, а несколько токенов (возможно даже из разных очередей, если обращение идет к нескольким сервисам), таким образом у нас появится понятие 'веса' задачи, или же расширить размер нашего 'ведра' токенов, позволив им накапливаться, тем самым компенсируя периоды простоя. Вобщем, пространство для маневра просто огромное и ограничено только вашим воображением и инженерными навыками) Всем спасибо, всем удачи!
P.s. Поделитесь кто как решал подобную проблему, будет интересно услышать
So what's the problem?
Для начала пара слов о том, какую проблему я вообще пытаюсь решить. Дело в том, что 99.9% сервисов в интернете запрещают бесконтрольно закидывать их сотнями/тысячами запросов в секунду, угрожая дать в ответ какой-нибудь 403 или 500. Нет, ну правда, жалко им чтоле? Иногда таким сервисом может выступать даже своя собственная БД… Вобщем, доверять нынче нельзя никому, поэтому приходится себя как-то сдерживать.
Конечно, если вся работа ведется внутри 1го процесса, то никакой проблемы нет, но т.к мы работаем с Celery, то у нас может быть не только N процессов (далее воркеров), но и M машин, и задача все это дело синхронизировать уже не кажется столь тривиальной.
What's in the box
Первое, на что натыкаешься, когда ищешь, как же настроить throttling в celery, это встроенный параметр rate_limit класса Task. Звучит как то, что надо, но, копнув чуть глубже, замечаем, что:
Нельзя задать rate limit на группу задач.
Это неудобно, т.к зачастую доступ к какому-то лимитированому ресурсу размазан между разными тасками.
Это неудобно, т.к зачастую доступ к какому-то лимитированому ресурсу размазан между разными тасками.
# представим что у нас лимит на вызовы API гитхаба 60 req/min
# придется поделить вызовы поровну
@app.task(rate_limit='30/m')
def get_github_api1():
...
@app.task(rate_limit='30/m')
def get_github_api2():
...
Этот лимит работает только внутри воркера, то есть он локальный и у каждого воркера свой.
Конечно, можно еще раз поделить лимит, теперь взяв в расчет еще и количество воркеров. Но все это начнет работать дико неэффективно, если таски будут прилетать неравномерно, например в какую-то минуту мы получим 60 вызовов get_github_api1() и 0 вызовов get_github_api2() — будут выполнены только 30 вызовов первого типа, хотя могли бы быть все 60. К тому же каждый раз, как появится новая таска, которой нужен доступ к этому ресурсу, придется снова везде пересчитывать все лимиты. Вобщем фича конечно полезная, но только для самых простых вариантов.
Bringing decision
Token Bucket
Решением проблемы для меня стал
You must be registered for see links
— алгоритм, использующийся для контроля полосы пропускания канала в компьютерных и телекомуникационных сетях. Опишу его в 2ух словах: пакет данных, чтобы пройти проверку канала на лимит, должен иметь при себе токен, который он взял из хранилища; в то же время в хранилище токены поступают с некоторой частотой. То есть пропускная способоность канала ограничивается скоростью выпуска токенов, которую нам и надо регулировать.В нашем же случае вместо пакета данных мы имеем таску, а хранилищем токенов будут выступать очереди RabbitMQ.
Wrting some code
Чтож, приступим к написанию кода. Создадим файл main.py и зададим базовые настройки:
from celery import Celery
from kombu import Queue
app = Celery('Test app', broker='amqp://guest@localhost//')
# 1 очередь под сами таски и 1 очередь под токены для них
app.conf.task_queues = [
Queue('github'),
# я ограничил длину очереди до 2ух, чтобы токены не скапливались
# иначе это может привести к пробою нашего rate limit'a
Queue('github_tokens', max_length=2)
]
# это таска будет играть роль нашего токена
# она никогда не будет запущена, мы просто будем забирать ее как сообщение из очереди
@app.task
def token():
return 1
# настраиваем постоянный выпуск нашего токена
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# мы будем выпускать по 1му токену в секунду
# это значит что rate limit для очереди github - 60 задач в минуту
sender.add_periodic_task(1.0, token.signature(queue='github_tokens'))
Не забудьте развернуть Rabbit, я предпочитаю делать это 1ой строчкой докера:
docker run -d --rm --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management
Теперь запустим celery beat — это специальный воркер celery, запускаемый всегда в единственном экземпляре и отвечающий за запуск периодических задач.
celery -A main beat --loglevel=info
После этого в консоли раз в секунду начнут появляться сообщения:
[2020-03-22 22:49:00,992: INFO/MainProcess] Scheduler: Sending due task main.token() (main.token)
Отлично, мы наладили выпуск токенов для нашего 'ведра'. Осталось только научить наших воркеров из него брать. Попробуем оптимизировать код, который мы написали ранее для запросов в github. Добавим эти строчки к main.py:
# Напишем функцию для взятия токена из очереди
def rate_limit(task, task_group):
# берем соединение с брокером из пула
with task.app.connection_for_read() as conn:
# забираем токен
msg = conn.default_channel.basic_get(task_group+'_tokens', no_ack=True)
# получили None - очередь пуста, токенов нет
if msg is None:
# повторить таску через 1 сек
task.retry(countdown=1)
# Добавим print в таски для логирования
# Здесь я поставил max_retries=None, так что таски будут
# повторяться, пока не будут выполнены
@app.task(bind=True)
def get_github_api1(self, max_retries=None):
rate_limit(self, 'github')
print ('Called Api 1')
@app.task(bind=True)
def get_github_api2(self, max_retries=None):
rate_limit(self, 'github')
print ('Called Api 2')
А теперь проверим, как это все работает. В дополнение к уже запущенному beat добавим 8 воркеров:
celery -A main worker -с 8 -Q github
И создадим отдельный маленький скрипт для запуска этих задач, назовем его producer.py:
from main import get_github_api1, get_github_api2
tasks = [get_github_api1, get_github_api2]
for i in range(100):
# запускаю таски в перемешку
tasks[i % 2].apply_async(queue='github')
Запускаем — python producer.py, и смотрим в логи воркеров:
[2020-03-23 13:04:15,017: WARNING/ForkPoolWorker-3] Called Api 2
[2020-03-23 13:04:16,053: WARNING/ForkPoolWorker-8] Called Api 2
[2020-03-23 13:04:17,112: WARNING/ForkPoolWorker-1] Called Api 2
[2020-03-23 13:04:18,187: WARNING/ForkPoolWorker-1] Called Api 1
... (96 more lines)
Несмотря на то, что у нас целых 8 рабочих процессов, таски выполняются примерно раз в секунду, отправляясь в конец очереди, если на момент их выполнения не оказалось токена. Также, я думаю, вы уже заметили, что на самом деле мы накладываем rate limit не совсем на очередь, а скорее на какую-то логически связанную группу задач, которые на самом деле могут находится как в разных очередях, так и в одной. Таким образом наш контроль становится даже более детальным и гранулированным.
Putting it all together
Конечно, количество таких групп задач не ограничено (разве что возможностями брокера). Соберем весь код в кучку, расширим и причешим его:
from celery import Celery
from kombu import Queue
from queue import Empty
from functools import wraps
app = Celery('hello', broker='amqp://guest@localhost//')
task_queues = [
Queue('github'),
Queue('google')
]
# количество запусков в минуту
rate_limits = {
'github': 60,
'google': 100
}
# автоматически сгенерируем очереди с токенами под все группы, на которые нужен лимит
task_queues += [Queue(name+'_tokens', max_length=2) for name, limit in rate_limits.items()]
app.conf.task_queues = task_queues
@app.task
def token():
return 1
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# автоматически настроим выпуск токенов с нужной скоростью
for name, limit in rate_limits.items():
sender.add_periodic_task(60 / limit, token.signature(queue=name+'_tokens'))
# Как можно не любить декораторы?
def rate_limit(task_group):
def decorator_func(func):
@wraps(func)
def function(self, *args, **kwargs):
with self.app.connection_for_read() as conn:
# тут я для примера использовал другой более высокоуровневый подход:
# в замен на получение полноценного интерфейса очереди
# мы немного теряем в перфомансе, т.к под капотом происходит обмен
# несколькими сообщениями с брокером
with conn.SimpleQueue(task_group+'_tokens', no_ack=True, queue_opts={'max_length':2}) as queue:
try:
# из плюсов также - наличие вот такого блокирующего вызова
# это может быть удобнее, чем постоянная ротация с retry()
# впрочем, это нужно подбирать под кейс
queue.get(block=True, timeout=5)
return func(self, *args, **kwargs)
except Empty:
self.retry(countdown=1)
return function
return decorator_func
# с декораторами все-таки намного красивее и читабельнее, согласитесь?
@app.task(bind=True, max_retries=None)
@rate_limit('github')
def get_github_api1(self):
print ('Called github Api 1')
@app.task(bind=True, max_retries=None)
@rate_limit('github')
def get_github_api2(self):
print ('Called github Api 2')
@app.task(bind=True, max_retries=None)
@rate_limit('google')
def query_google_api1(self):
print ('Called Google Api 1')
@app.task(bind=True, max_retries=None)
@rate_limit('google')
def query_google_api1(self):
print ('Called Google Api 2')
Таким образом суммарные вызовы задач группы google не превысят 100/мин, а группы github — 60/мин. Заметьте, что для того, чтобы настроить такой throttling, понадобилось меньше 50 строк. Как по мне, достаточно просто.
Moving further
Ну, вот все и работает как надо, причем без каких-либо сторонних примочек, средствами только самого брокера. Но зачем останавливаться на достигнутом ? Грамотно используя данный алгоритм, можно пойти дальше и создать намного более сложные и гибкие стратегии. Например, некоторые таски могут брать не 1, а несколько токенов (возможно даже из разных очередей, если обращение идет к нескольким сервисам), таким образом у нас появится понятие 'веса' задачи, или же расширить размер нашего 'ведра' токенов, позволив им накапливаться, тем самым компенсируя периоды простоя. Вобщем, пространство для маневра просто огромное и ограничено только вашим воображением и инженерными навыками) Всем спасибо, всем удачи!
P.s. Поделитесь кто как решал подобную проблему, будет интересно услышать