НОВОСТИ [Перевод] Celery throttling — настраивам rate limit для очередей

BDFINFO2.0
Оффлайн
Регистрация
14.05.16
Сообщения
11.398
Реакции
501
Репутация
0
В этой статье я покажу как решить одну из проблем, возникающих при использовании распределенных очередей задач — регулирование пропускной способности очереди, или же, более простым языком, настройка ее rate limit'a. В качестве примера я возьму python и свою любимую связку Celery+RabbitMQ, хотя алгоритм, который я использую, никак не зависит от этих инструментов и может быть реализован на любом другом стэке.


eih-panacwyd8rxigfn7saopzbs.jpeg


So what's the problem?



Для начала пара слов о том, какую проблему я вообще пытаюсь решить. Дело в том, что 99.9% сервисов в интернете запрещают бесконтрольно закидывать их сотнями/тысячами запросов в секунду, угрожая дать в ответ какой-нибудь 403 или 500. Нет, ну правда, жалко им чтоле? Иногда таким сервисом может выступать даже своя собственная БД… Вобщем, доверять нынче нельзя никому, поэтому приходится себя как-то сдерживать.


Конечно, если вся работа ведется внутри 1го процесса, то никакой проблемы нет, но т.к мы работаем с Celery, то у нас может быть не только N процессов (далее воркеров), но и M машин, и задача все это дело синхронизировать уже не кажется столь тривиальной.


What's in the box



Первое, на что натыкаешься, когда ищешь, как же настроить throttling в celery, это встроенный параметр rate_limit класса Task. Звучит как то, что надо, но, копнув чуть глубже, замечаем, что:

Нельзя задать rate limit на группу задач.
Это неудобно, т.к зачастую доступ к какому-то лимитированому ресурсу размазан между разными тасками.​


# представим что у нас лимит на вызовы API гитхаба 60 req/min
# придется поделить вызовы поровну
@app.task(rate_limit='30/m')
def get_github_api1():
...

@app.task(rate_limit='30/m')
def get_github_api2():
...

Этот лимит работает только внутри воркера, то есть он локальный и у каждого воркера свой.​

Конечно, можно еще раз поделить лимит, теперь взяв в расчет еще и количество воркеров. Но все это начнет работать дико неэффективно, если таски будут прилетать неравномерно, например в какую-то минуту мы получим 60 вызовов get_github_api1() и 0 вызовов get_github_api2() — будут выполнены только 30 вызовов первого типа, хотя могли бы быть все 60. К тому же каждый раз, как появится новая таска, которой нужен доступ к этому ресурсу, придется снова везде пересчитывать все лимиты. Вобщем фича конечно полезная, но только для самых простых вариантов.

Bringing decision


Token Bucket



Решением проблемы для меня стал — алгоритм, использующийся для контроля полосы пропускания канала в компьютерных и телекомуникационных сетях. Опишу его в 2ух словах: пакет данных, чтобы пройти проверку канала на лимит, должен иметь при себе токен, который он взял из хранилища; в то же время в хранилище токены поступают с некоторой частотой. То есть пропускная способоность канала ограничивается скоростью выпуска токенов, которую нам и надо регулировать.
В нашем же случае вместо пакета данных мы имеем таску, а хранилищем токенов будут выступать очереди RabbitMQ.


z4ca3hrv549fh-gyoemrxeszpnm.png


Wrting some code



Чтож, приступим к написанию кода. Создадим файл main.py и зададим базовые настройки:


from celery import Celery
from kombu import Queue

app = Celery('Test app', broker='amqp://guest@localhost//')

# 1 очередь под сами таски и 1 очередь под токены для них
app.conf.task_queues = [
Queue('github'),
# я ограничил длину очереди до 2ух, чтобы токены не скапливались
# иначе это может привести к пробою нашего rate limit'a
Queue('github_tokens', max_length=2)
]

# это таска будет играть роль нашего токена
# она никогда не будет запущена, мы просто будем забирать ее как сообщение из очереди
@app.task
def token():
return 1

# настраиваем постоянный выпуск нашего токена
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# мы будем выпускать по 1му токену в секунду
# это значит что rate limit для очереди github - 60 задач в минуту
sender.add_periodic_task(1.0, token.signature(queue='github_tokens'))


Не забудьте развернуть Rabbit, я предпочитаю делать это 1ой строчкой докера:


docker run -d --rm --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management


Теперь запустим celery beat — это специальный воркер celery, запускаемый всегда в единственном экземпляре и отвечающий за запуск периодических задач.


celery -A main beat --loglevel=info


После этого в консоли раз в секунду начнут появляться сообщения:


[2020-03-22 22:49:00,992: INFO/MainProcess] Scheduler: Sending due task main.token() (main.token)


Отлично, мы наладили выпуск токенов для нашего 'ведра'. Осталось только научить наших воркеров из него брать. Попробуем оптимизировать код, который мы написали ранее для запросов в github. Добавим эти строчки к main.py:


# Напишем функцию для взятия токена из очереди
def rate_limit(task, task_group):
# берем соединение с брокером из пула
with task.app.connection_for_read() as conn:
# забираем токен
msg = conn.default_channel.basic_get(task_group+'_tokens', no_ack=True)
# получили None - очередь пуста, токенов нет
if msg is None:
# повторить таску через 1 сек
task.retry(countdown=1)

# Добавим print в таски для логирования
# Здесь я поставил max_retries=None, так что таски будут
# повторяться, пока не будут выполнены
@app.task(bind=True)
def get_github_api1(self, max_retries=None):
rate_limit(self, 'github')
print ('Called Api 1')

@app.task(bind=True)
def get_github_api2(self, max_retries=None):
rate_limit(self, 'github')
print ('Called Api 2')


А теперь проверим, как это все работает. В дополнение к уже запущенному beat добавим 8 воркеров:


celery -A main worker -с 8 -Q github


И создадим отдельный маленький скрипт для запуска этих задач, назовем его producer.py:


from main import get_github_api1, get_github_api2

tasks = [get_github_api1, get_github_api2]

for i in range(100):
# запускаю таски в перемешку
tasks[i % 2].apply_async(queue='github')


Запускаем — python producer.py, и смотрим в логи воркеров:


[2020-03-23 13:04:15,017: WARNING/ForkPoolWorker-3] Called Api 2
[2020-03-23 13:04:16,053: WARNING/ForkPoolWorker-8] Called Api 2
[2020-03-23 13:04:17,112: WARNING/ForkPoolWorker-1] Called Api 2
[2020-03-23 13:04:18,187: WARNING/ForkPoolWorker-1] Called Api 1
... (96 more lines)


Несмотря на то, что у нас целых 8 рабочих процессов, таски выполняются примерно раз в секунду, отправляясь в конец очереди, если на момент их выполнения не оказалось токена. Также, я думаю, вы уже заметили, что на самом деле мы накладываем rate limit не совсем на очередь, а скорее на какую-то логически связанную группу задач, которые на самом деле могут находится как в разных очередях, так и в одной. Таким образом наш контроль становится даже более детальным и гранулированным.

Putting it all together



Конечно, количество таких групп задач не ограничено (разве что возможностями брокера). Соберем весь код в кучку, расширим и причешим его:


from celery import Celery
from kombu import Queue
from queue import Empty
from functools import wraps

app = Celery('hello', broker='amqp://guest@localhost//')

task_queues = [
Queue('github'),
Queue('google')
]

# количество запусков в минуту
rate_limits = {
'github': 60,
'google': 100
}

# автоматически сгенерируем очереди с токенами под все группы, на которые нужен лимит
task_queues += [Queue(name+'_tokens', max_length=2) for name, limit in rate_limits.items()]

app.conf.task_queues = task_queues

@app.task
def token():
return 1

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# автоматически настроим выпуск токенов с нужной скоростью
for name, limit in rate_limits.items():
sender.add_periodic_task(60 / limit, token.signature(queue=name+'_tokens'))

# Как можно не любить декораторы?
def rate_limit(task_group):
def decorator_func(func):
@wraps(func)
def function(self, *args, **kwargs):
with self.app.connection_for_read() as conn:
# тут я для примера использовал другой более высокоуровневый подход:
# в замен на получение полноценного интерфейса очереди
# мы немного теряем в перфомансе, т.к под капотом происходит обмен
# несколькими сообщениями с брокером
with conn.SimpleQueue(task_group+'_tokens', no_ack=True, queue_opts={'max_length':2}) as queue:
try:
# из плюсов также - наличие вот такого блокирующего вызова
# это может быть удобнее, чем постоянная ротация с retry()
# впрочем, это нужно подбирать под кейс
queue.get(block=True, timeout=5)
return func(self, *args, **kwargs)
except Empty:
self.retry(countdown=1)
return function
return decorator_func

# с декораторами все-таки намного красивее и читабельнее, согласитесь? ;)
@app.task(bind=True, max_retries=None)
@rate_limit('github')
def get_github_api1(self):
print ('Called github Api 1')

@app.task(bind=True, max_retries=None)
@rate_limit('github')
def get_github_api2(self):
print ('Called github Api 2')

@app.task(bind=True, max_retries=None)
@rate_limit('google')
def query_google_api1(self):
print ('Called Google Api 1')

@app.task(bind=True, max_retries=None)
@rate_limit('google')
def query_google_api1(self):
print ('Called Google Api 2')


Таким образом суммарные вызовы задач группы google не превысят 100/мин, а группы github — 60/мин. Заметьте, что для того, чтобы настроить такой throttling, понадобилось меньше 50 строк. Как по мне, достаточно просто.

Moving further



Ну, вот все и работает как надо, причем без каких-либо сторонних примочек, средствами только самого брокера. Но зачем останавливаться на достигнутом ;)? Грамотно используя данный алгоритм, можно пойти дальше и создать намного более сложные и гибкие стратегии. Например, некоторые таски могут брать не 1, а несколько токенов (возможно даже из разных очередей, если обращение идет к нескольким сервисам), таким образом у нас появится понятие 'веса' задачи, или же расширить размер нашего 'ведра' токенов, позволив им накапливаться, тем самым компенсируя периоды простоя. Вобщем, пространство для маневра просто огромное и ограничено только вашим воображением и инженерными навыками) Всем спасибо, всем удачи!


P.s. Поделитесь кто как решал подобную проблему, будет интересно услышать ;)
 
Сверху Снизу