Подсчет количества пар товаров в продуктовых чеках с помощью трех инструментов: Python, Spark, SQL

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Добрый день, уважаемые читатели! Не открою для большинства секрета, если скажу, что большая часть задач в материалах к учебным курсам сформулирована шаблонно. Какие-то вопросы в принципе могут представлять интерес, но очень оторваны от реальных потребностей бизнеса. Какие-то моменты выдернуты из книг, поэтому лучше знакомиться с ними, читая первоисточник. Но есть кейсы, которые на первый взгляд хоть и кажутся простыми и стереотипными, но, если присмотреться к ним более пристально, могут дать пищу для размышления. Вот на одной из таких полезных задач мне хотелось бы заострить внимание в данной заметке. Формулируется вопрос следующим образом: «Необходимо определить количество пар товаров в продуктовых чеках. Вывести 10 самых частых сочетаний». Пример, чек 1 содержит товар 1, товар 2, товар 3, а чек 2 -  товар 1, товар 2, товар 5. Следовательно, комбинация «товар 1, товар 2» встречается 2 раза, «товар 1 , товар 3» один раз и т.д.

В исходнике решать данный кейс предлагалось силами Python. Но реальная жизнь может потребовать от аналитика данных умения выполнять данное упражнение как с помощью SQL, так и Spark. Следовательно, рассмотрим три подхода, оставив за скобками разговора четвертый вариант – расчеты на платформах BI.

Вариант 1. Python

Вопрос не подкреплялся подходящим датасетом, поэтому я нашел массив данных на портале Kaggle. В нем 541909 записей, 25900 уникальных чеков и 4070 разных кодов товаров. Вы можете взять любой датасет, главное, чтобы в нем были два поля: номер чека и коды товаров либо их наименования. Данные можно сгенерировать и рандомно, но при этом нужно учитывать объем информации. На сформированном массиве в несколько сотен записей вы можете апробировать ваши решения, но вот смогут ли скрипты справиться с сотнями тысяч или даже миллионами строк, вопрос будет оставаться открытым.

Теперь можно приступить к описанию логики, которую затем мы постараемся отразить в коде. В каждом чеке могут встречаться дубликаты товарных кодов, от них нужно избавиться. Группируясь по кодам чеков, мы формируем массивы со списками покупок. Из элементов каждого такого массива составляем уникальные пары кодов. При этом учитываем важный нюанс, чтобы пары товаров не дублировались, сортируем коды в паре по возрастанию. Иначе мы неизбежно столкнемся с ситуацией, что «100-101» и «101-100» это разные варианты. Полученные пары товаров и количество этих пар нужно где-то хранить. В данном случае, на мой взгляд, оптимальнее всего использовать словарь, где ключом будет выступать кортеж из кодов. Один из возможных вариантов такого подхода.

df = pd.read_csv('/content/data_two_columns.csv')
df_group = df.groupby(by=['invoiceno'])['stockcode'].apply(set).reset_index(name='list_stockcode')
dict_couple_products_amount = {}

for current_set in df_group['list_stockcode']:
  if len(current_set)>1:
    list_combinations = [tuple(sorted(_)) for _ in list(itertools.combinations(current_set, 2))]
    for element_list_combinations in list_combinations:
      if dict_couple_products_amount.get(element_list_combinations) is None:
        dict_couple_products_amount[element_list_combinations] = 1
      else:
        dict_couple_products_amount[element_list_combinations] = dict_couple_products_amount[element_list_combinations] + 1

df_final = pd.DataFrame(dict_couple_products_amount.items(),columns=['couple_stockcode','count'])
df_final = df_final.sort_values(by=['count'], ascending=False)
df_final.head()

Можно, конечно, применить хеш-функцию для формирования ключей, но это автоматически приведет к усложнению решения. С полной версией ноутбука можно ознакомиться по указанной ссылке. Время заполнения словаря 45 секунд. Забегая вперед, хочу сказать, что это лучшее время по сравнению с другими вариантами.

Вариант 2. Spark (PySpark)

Код, написанный под данный инструмент аналитики, оказался самым медленным. Время работы либо около 2 минут либо более 4. Тестировал на платформах Google Colab и Community Databricks.

spark = SparkSession.builder.master('local[*]').appName("CoupleCodes").getOrCreate()

df = spark.read.format('csv') \
                             .options(inferSchema='true', delimiter=',', header='true') \
                             .load('/content/data_two_columns.csv')

df_group = df.groupBy('invoiceno').agg(F.collect_set('stockcode').alias('list_stockcode'))
df_group = df_group.filter(size(df_group.list_stockcode)>1)

def create_combinations_stockcode(list_stockcode:List)->List: 
 list_combinations = list(itertools.combinations(sorted(list_stockcode), 2))
 return [str(i[0])+', '+str(i[1]) for i in list_combinations]

combinations_stockcode_udf = udf(lambda x: create_combinations_stockcode(x),T.ArrayType(T.StringType()))
df_group_combinations = df_group.withColumn("combinations_stockcode", combinations_stockcode_udf(col("list_stockcode")))

df_group_combinations.select(explode(col('combinations_stockcode')).alias("couple_stockcode")) \
        .groupBy('couple_stockcode') \
        .count() \
        .sort(col("count").desc()) \
        .toPandas() \
        .head()

Краткие комментарии. Во-первых, я перевожу итоговый результат в датафрейм Pandas (toPandas().head()). Я сознательно пошел на такой шаг, чтобы улучшить отображение вывода, но практического смысла в этом нет. Во-вторых, пришлось применять функцию udf, внутри которой работает стандартная библиотека itertools, аналога которой я не нашел на PySpark. По факту в Spark перекочевала логика решения на Python. Исключением является только момент, где отрабатывает explode(), происходит разворот массива с парами кодов в колонку, а не их складирование в словарь. Но это может быть не оптимально! Мне кажется, что если лучше разобраться с возможностями PySpark, то можно написать более элегантное и быстрое решение. Но получилось, как получилось… В целом подход рабочий, но медленный.

Вариант 2. Spark (Scala)

Время - 5 минут. Тестировал только на Community Databricks. Тут без комментариев, так как в Scala не силен. Но опять же под подозрением функция udf.

val path = "/FileStore/tables/data_two_columns.csv"
val df = spark.read.option("delimiter", ",").option("header", "true").csv(path)

val df_group = df.groupBy("invoiceno").agg(collect_set("stockcode").as("list_stockcode"))

val df_group_filter = df_group.filter(size(df_group("list_stockcode"))>1)

val combinationsStockСodeUDF= udf((l: Array[String]) => l.sorted.toSeq.combinations(2).toList)
val df_group_combinations = df_group_filter.select(col("invoiceno"),combinationsStockСodeUDF(col("list_stockcode")).as("combinations_stockcode"))

val df_final = df_group_combinations.select(explode(col("combinations_stockcode")).as("couple_stockcode"))
df_final.groupBy("couple_stockcode").count().sort(col("count").desc).show(10)

Вариант 3. SQL

Несмотря на то, что большинство аналитиков довольно уверенно владеют языком запросов, данная задача может вызвать ряд затруднений. Дело все в том, что основная комбинаторика в SQL достигается за счет JOIN-ов. Если их использовать в лоб, тривиальный код не позволит обсчитать даже демонстрационные данные. Допустим, мы возьмем все  уникальные коды товаров. Чтобы получить пары применим CROSS JOIN c условием WHERE tbl1.stockcode < tbl2.stockcode, чтобы избавиться от повторов. Применяя декартово произведение на всем многообразии кодов, мы попадаем в ловушку. Образуется слишком много сочетаний, часть из которых не будет представлена в реальных чеках. Дальше к этому столбцу нужно будет делать LEFT JOIN по сложному условию так, чтобы и первый и второй код из пары были в сгруппированном массиве кодов товаров в чеке. Технически это можно достигнуть через функцию ANY. Что получается в итоге. Массив из пар кодов (больше 8 миллионов комбинаций) соединяется с другой таблицей, при этом в худшем случае можно ожидать более 800 соответствий для одной пары кодов. Выполнить до конца такой скрипт в разумное время у меня не получилось. Хотя маленькие блоки информации так анализировать вполне возможно.

Какой тогда выход? В принципе не нужно совсем отказываться от комбинаторики, но необходимо проводить ее на уровне чека, чтобы в пары кодов не попадали случайные сочетания. Ключевую роль здесь сыграют две не самые популярные функции ARRAY_AGG() и UNNEST() из арсенала PostgreSQL. Одна из них собирает значения столбца в массив при группировке, а вторая наоборот разворачивает элементы массива в строки. Работа данных функций не является уникальной, поэтому их аналоги должны быть и в других базах данных.

sql = """with tbl_no_duplicates as (select s.invoiceno, 
                                           s.stockcode 
                                    from sales as s 
                                    group by s.invoiceno, 
                                             s.stockcode),
      
              tbl_list_code_combinations as (select t.invoiceno, (select array_agg(concat(cast(t1.* as text),', ',cast(t2.* as text)))
                                                                  from unnest(array_agg(t.stockcode)) as t1 
                                                                                     cross join unnest(array_agg(t.stockcode)) as t2
                                                                  where t1.* < t2.*) as agg
                                             from tbl_no_duplicates as t
                                            group by t.invoiceno)

select unnest(t.agg) as couple_stockcode, count(*)
from tbl_list_code_combinations as t
group by  unnest(t.agg) 
order by count(*) desc
limit 10"""

Весь остальной код не требует особых комментариев. Остается лишь добавить, что скрипт полностью отработал за 55 секунд, лишь немного уступив варианту на Python. При этом важным преимуществом такого подхода является то, что результаты можно получить прямо в базе данных, минуя выгрузку.

На этом все. Всем здоровья, удачи и профессиональных успехов!

Источник: https://habr.com/ru/post/657623/


Интересные статьи

Интересные статьи

Это продолжение туториала по JUnit 5. Введение опубликовано здесь.Научитесь создавать отчеты о покрытии кода для тестов JUnit с помощью подключаемого модуля JaCoCo Maven.
Бурение скважин всегда было и будет дорогостоящим занятием, а бурение в таких местах планеты как пустыня Сахара тем более. В объеме капитальных затрат на обустройство месторождений, затраты на бурение...
Liquibase — это инструмент управления изменениями в базе данных. С его помощью вы можете отслеживать изменения в базе данных, сделанные с помощью SQL (или XML) скриптов. ...
Понимание того, как MySQL использует память, является ключом к настройке системы для достижения оптимальной производительности, также как и для устранения неполадок в случаях ненормальног...
К написанию статьи подтолкнул вот этот комментарий. А точнее, одна фраза из него. … расходовать память или такты процессора на элементы в миллиардных объёмах — это нехорошо… Так сложилось, ...