- Регистрация
- 21.07.20
- Сообщения
- 40.408
- Реакции
- 1
- Репутация
- 0
Как я дошёл до жизни такой?
Не так давно мне пришлось работать над бэкендом высоко нагруженного проекта, в котором нужно было организовать регулярное выполнение большого количества фоновых задач со сложными вычислениями и запросами на сторонние сервисы. Проект асинхронный и до того, как я пришёл, в нём был простой механизм крон-запуска задач: цикл с проверкой текущего времени и запуск групп корутин через gather — такой подход оказался приемлем до момента, пока таких корутин были десятки и сотни, однако, когда их количество перевалило через две тысячи, пришлось думать об организации нормальной очереди задач с брокером, несколькими воркерами и прочим.
Сначала я решил опробовать Celery, которым пользовался ранее. В связи с асинхронностью проекта, я погрузился в вопрос и увидел
You must be registered for see links
, а так же
You must be registered for see links
, созданный автором статьи.Скажу так, проект очень интересный и вполне успешно работает в других приложениях нашей команды, да и сам автор говорит о том, что смог выкатить в прод, заюзав асинхронный пул. Но, к сожалению, мне это не очень подошло, так как обнаружилась
You must be registered for see links
с групповым запуском задач (см.
You must be registered for see links
). На момент написания статьи
You must be registered for see links
уже закрыта, однако, работа велась на протяжении месяца. В любом случае, автору удачи и всех благ, так как рабочие штуки на либе уже есть… в общем, дело во мне и для меня оказался инструмент сыроват. Вдобавок, в некоторых задачах было по 2-3 http-запроса к разным сервисам, таким образом даже при оптимизации задач мы создаём 4 тысячи tcp соединений, примерно каждые 2 часа — не очень… Хотелось бы создавать сессию на один тип задач при запуске воркеров. Чуть подробнее о большом кол-ве запросов через aiohttp
You must be registered for see links
.В связи с этим, я начал искать альтернативы и нашёл! Создателями celery, а конкретно, как я понял
You must be registered for see links
, была создана
You must be registered for see links
, изначально для проекта
You must be registered for see links
. Faust написана под впечатлением от Kafka Streams и работает с Kafka в качестве брокера, также для хранения результатов от работы агентов используется rocksdb, а самое главное — это то, что библиотека асинхронна.Также, можете глянуть
You must be registered for see links
celery и faust от создателей последней: их различия, различия брокеров, реализацию элементарной задачи. Всё весьма просто, однако, в faust привлекает внимание приятная особенность — типизированные данные для передачи в топик.Что будем делать?
Итак, в небольшой серии статей я покажу, как собирать данные в фоновых задачах с помощью Faust. Источником для нашего пример-проекта будет, как следует из названия,
You must be registered for see links
. Я продемонстрирую, как писать агентов (sink, топики, партиции), как делать регулярное (cron) выполнение, удобнейшие cli-комманды faust (обёртка над click), простой кластеринг, а в конце прикрутим datadog (работающий из коробки) и попытаемся, что-нибудь увидеть. Для хранения собранных данных будем использовать mongodb и motor для подключения.P.S. Судя по уверенности, с которой написан пункт про мониторинг, думаю, что читатель в конце последней статьи всё-таки будет выглядеть, как-то так:
Требования к проекту
В связи с тем, что я уже успел наобещать, составим небольшой списочек того, что должен уметь сервис:
- Выгружать ценные бумаги и overview по ним (в т.ч. прибыли и убытки, баланс, cash flow — за последний год) — регулярно
- Выгружать исторические данные (для каждого торгового года находить экстремумы цены закрытия торгов) — регулярно
- Выгружать последние торговые данные — регулярно
- Выгружать настроенный список индикаторов для каждой ценной бумаги — регулярно
Как полагается, выбираем имя проекту с потолка: horton
Готовим инфраструктуру
Заголовок конечно сильный, однако, всё что нужно сделать — это написать небольшой конфиг для docker-compose с kafka (и zookeeper — в одном контейнере), kafdrop (если нам захочется посмотреть сообщения в топиках), mongodb. Получаем [docker-compose.yml](
You must be registered for see links
) следующего вида:version: '3'
services:
db:
container_name: horton-mongodb-local
image: mongo:4.2-bionic
command: mongod --port 20017
restart: always
ports:
- 20017:20017
environment:
- MONGO_INITDB_DATABASE=horton
- MONGO_INITDB_ROOT_USERNAME=admin
- MONGO_INITDB_ROOT_PASSWORD=admin_password
kafka-service:
container_name: horton-kafka-local
image: obsidiandynamics/kafka
restart: always
ports:
- "2181:2181"
- "9092:9092"
environment:
KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNALLAINTEXT,EXTERNALLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
KAFKA_RESTART_ATTEMPTS: "10"
KAFKA_RESTART_DELAY: "5"
ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"
kafdrop:
container_name: horton-kafdrop-local
image: 'obsidiandynamics/kafdrop:latest'
restart: always
ports:
- '9000:9000'
environment:
KAFKA_BROKERCONNECT: kafka-service:29092
depends_on:
- kafka-service
Тут вообще ничего сложного. Для kafka объявили два listener'а: одного (internal) для использования внутри композной сети, а второго (external) для запросов из вне, поэтому пробросили его наружу. 2181 — порт zookeeper'а. По остальному, я думаю, ясно.
Готовим скелет проекта
В базовом варианте структура нашего проекта должна выглядеть так:
horton
├── docker-compose.yml
└── horton
├── agents.py *
├── alphavantage.py *
├── app.py *
├── config.py
├── database
│ ├── connect.py
│ ├── cruds
│ │ ├── base.py
│ │ ├── __init__.py
│ │ └── security.py *
│ └── __init__.py
├── __init__.py
├── records.py *
└── tasks.py *
*Всё что я отметил мы пока не трогаем, а просто создаём пустые файлы.**
Создали структуру. Теперь добавим необходимые зависимости, напишем конфиг и подключение к mongodb. Полный текст файлов приводить в статье не буду, чтобы не затягивать, а сделаю ссылки на нужные версии.
Начнём с зависимостей и мета о проекте —
You must be registered for see links
Далее, запускаем установку зависимостей и создание virtualenv (либо, можете сами создать папку venv и активировать окружение):
pip3 install poetry (если ещё не установлено)
poetry install
Теперь создадим
You must be registered for see links
— креды и куда стучаться. Сразу туда можно разместить и данные для alphavantage. Ну и переходим к
You must be registered for see links
— извлекаем данные для приложения из нашего конфига. Да, каюсь, заюзал свою либу —
You must be registered for see links
.По подключению с монго — совсем всё просто. Объявили
You must be registered for see links
для подключения и
You must be registered for see links
для крудов, чтобы проще было делать запросы по коллекциям.Что будет дальше?
Статья получилась не очень большая, так как здесь я говорю только о мотивации и подготовке, поэтому не обессудьте — обещаю, что в следующей части будет экшн и графика.
Итак, а в этой самой следующей части мы:
- Напишем небольшой клиентик для alphavantage на aiohttp с запросами на нужные нам эндпоинты.
- Сделаем агента, который будет собирать данные о ценных бумагах и исторические цены по ним.
You must be registered for see links
You must be registered for see links