НОВОСТИ [Из песочницы] Разработка модели в PySpark ML на датасете с разными типами данных для ржавых чайников

NewsBot
Оффлайн

NewsBot

.
.
Регистрация
21.07.20
Сообщения
40.408
Реакции
1
Репутация
0
А ты уже умеешь работать с несколькими типами данных в PySpark ML? Нет? Тогда тебе срочно нужно к нам.

cattouchretcr


Всем привет! Хочу раскрыть подробно одну интересную, но, к несчастью, не встречающуюся тему в документации Spark: как обучать модель в PySpark ML на датасете с разными типами данных (строковыми и числовыми)? Желание написать данную статью было вызвано необходимостью в течение нескольких дней просматривать Интернет в поисках необходимой статьи с кодом, ведь в официальном туториале от Spark приведён пример работы не то что с признаками одного типа данных, а вообще с одним признаком, а информация, как работать с несколькими колонками тем более разных типов данных, там отсутствует. Однако, подробно изучив возможности PySpark для работы с данными, у меня получилось написать рабочий код и понять как всё происходит, чем хочу поделиться и с вами. Так что полный вперёд, друзья!

Первоначально давайте импортируем все необходимые библиотеки для работы, а потом подробно разберём код, чтобы любой уважающий себя «ржавый чайник», как, впрочем, и я недавно, всё понял:


#импортируем необходимые библиотеки
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pyspark.sql.functions as sf
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
#other types of regression models
#можно использовать и другие виды регрессии
#from pyspark.ml.regression import LinearRegression
#from pyspark.ml.regression import RandomForestRegressor
#from pyspark.ml.regression import GeneralizedLinearRegression
#from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

Теперь создадим (локальный) спарковский контекст и спарковскую сессию и проверим, всё ли работает, выведя полученное на экран. Создание спарковской сессии является отправной точкой в работе с датасетами в Spark:


#создаём спарк сессею
sc = SparkContext('local')
spark = SparkSession(sc)
spark

fcuhb3qtma_urlmlf8xrhtm98ik.png


Инструмент для работы с данными есть, теперь загрузим их. В статье используется датасет, который был взят с сайта соревнований по машинному обучению Kaggle:

который после скачивания хранится в path_csv в формате .csv и имеет следующие опции:

  • header: если в нашем файле первая строка является заголовком, то ставим «true»
  • delimiter: ставим знак, разделяющий данные одной строки по признакам, зачастую это "," или ";"
  • inferSchema: если «true», то PySpark автоматически определит тип каждой колонки, иначе вам придётся прописывать его самостоятельно


#загружаем данные формата .csv из path_csv
path_csv = 'greenhouse_gas_inventory_data_data.csv'
data = spark.read.format("csv")\
.option("header", "true")\
.option("delimiter", ",")\
.option("inferSchema", "true")\
.load(path_csv)

Чтобы лучше понимать, с какими данными мы имеем дело, посмотрим на несколько их строк:


#посмотрим на часть данных
data.show()

b58s5kf3jwh4ik3rhbh2fjrbalq.png

Также посмотрим сколько у нас всего строк в датасете:

#количество строк данных
data.select('year').count()

m-ze_wagu5dplmom4xdvojgli34.png


И, наконец, выведем типы наших данных, которые, как мы помним, мы попросили PySpark определить автоматически с помощью option(«inferSchema», «true»):


#посмотрим на типы всех наших колонок
data.printSchema()

yz3jhbqqeepkk_aug-9dk5aqtj8.png


Теперь переходим к нашему основному блюду — работе с несколькими признаками разных типов данных. Spark может обучить модель на преобразованных данных, где предсказываемая колонка является вектором и колонки с признаками тоже — вектор, что усложняет задачу… Но мы не сдаёмся, и чтобы обучить модель в PySpark мы будем использовать Pipeline, в который мы передадим некий план действий (переменная stages):

  1. шаг label_stringIdx: мы преобразовываем колонку датасета value, которую мы хотим предсказывать, в спарковскую строку-вектор и переназываем на label с параметром handleInvalid = 'keep', означающий, что наша предсказываемая колонка поддерживает null
  2. шаг stringIndexer: преобразовываем строковые колонки в спарковские категориальные строки
  3. шаг encoder: преобразовываем категориальные колонки в бинарные (числовые) вектора благодаря строковому преобразователю
  4. шаг assembler: чтобы обучить модель в Spark, мы должны колонки с признаками преобразовать в один вектор, что можно достичь с помощью VectorAssembler(), который берёт на вход название численных (для этого мы и преобразовали строки в числа в предыдущем шаге) колонок (assemblerInputs) и преобразовываем все колонки в один вектор с именем «features»
  5. шаг gbt: в качестве модели регрессии из PySpark ML выбран GBTRegressor, потому что бустинг наше всё


#value - это зависимая и предсказываемая переменная - метка
stages = []
label_stringIdx = StringIndexer(inputCol = 'value', outputCol = 'label', handleInvalid = 'keep')
stages += [label_stringIdx]

#depend on categorical columns: country and types of emission
#зависит от категориаьных колонок: страны и категории загрязнения
categoricalColumns = ['country_or_area', 'category']
for categoricalCol in categoricalColumns:
#преобразование категориальных колонок в бинарные вектора благодаря строковому преобразователю
stringIndexer = StringIndexer(inputCol = categoricalCol,
outputCol = categoricalCol + 'Index',
handleInvalid = 'keep')
encoder = OneHotEncoder(inputCol=stringIndexer.getOutputCol(),
outputCol=categoricalCol + "classVec")
stages += [stringIndexer, encoder]

#зависит от численной колонки: года
numericCols = ['year']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
#преобразование нескольких колонок в вектор-колонку - признаки
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

Разделим наш датасет на тренировочную и тестовую выборку в любимом соотношении соотношении 70% к 30% соответственно и начнём тренировать модель с помощью градиентного регрессионого дерева бустинга (GBTRegressor), который должен предсказывать вектор «label» по признакам, ранее объединённым в один вектор «features» с ограничением по итерируемости maxIter=10:


#делим данные на обучающую и тестовую выборки (30% тестовая)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

#тренируем модель (градиентного регрессионого дерева бустинга)
gbt = GBTRegressor(labelCol="label", featuresCol="features", maxIter=10)
stages += [gbt]

# задаем план stages для обучения модели
pipeline = Pipeline(stages=stages)

А теперь нам осталось только отправить компьютеру план действий и тренировочный датасет:


# тренируем модель
model = pipeline.fit(trainingData)

# делаем предсказания на тестовой выборке
predictions = model.transform(testData)

Сохраним нашу модель, чтобы мы всегда могли вернуться к её использованию без повторного обучения:


#сохраняем модель
pipeline.write().overwrite().save('model/gbtregr_model')

И если вы решили вновь начать использовать обученную модель для предсказаний, то просто напишите:


#загружаем модель для работы после обучения
load_model = pipeline.read().load('model/gbtregr_model')


Итак, мы посмотрели, как в инструменте для работы с большими данными на языке Python, PySpark, реализуется работа с несколькими признаковыми колонками разных типов данных.

Теперь пора применить это в ваших моделях…
 
Сверху Снизу