FP-Growth算法自定义扩展(certainty,conviction)指标

FP-Growth 算法的指标扩展

简短回答:Spark MLlib 的 FP-Growth 算法默认不直接提供 certainty 和 conviction 指标,但我们可以轻松计算它们!

1. 标准指标 vs 扩展指标

Spark FP-Growth 提供的默认指标:

指标

说明

公式

支持度 (support)

项集出现频率

P(A∪B)

置信度 (confidence)

规则的可靠性

`P(B

A) = support(A∪B) / support(A)`

提升度 (lift)

规则的有效性

`P(B

A) / P(B)`

可以计算的扩展指标:

指标

说明

公式

确定性 (certainty)

规则的可信度

(confidence(A→B) + confidence(B→A)) / 2

确信度 (conviction)

规则的反向度量

(1 – support(B)) / (1 – confidence(A→B))

2. 完整代码:计算所有指标

import sys
import os
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import split, col, udf, array_contains, size, lit
from pyspark.ml.fpm import FPGrowth
import pandas as pd

# 创建SparkSession
spark = SparkSession.builder 
    .appName("FP_Growth_Extended_Metrics") 
    .master("local[2]") 
    .getOrCreate()

# 购物篮数据
data = [
    (1, 1001, "2024-01-01", "milk,bread,butter"),
    (2, 1001, "2024-01-01", "bread,eggs,juice"),
    (3, 1002, "2024-01-01", "milk,bread,beer,diapers"),
    (4, 1003, "2024-01-02", "bread,butter,cheese"),
    (5, 1004, "2024-01-02", "milk,bread,eggs,beer"),
    (6, 1004, "2024-01-02", "beer,diapers"),
    (7, 1005, "2024-01-03", "milk,bread,diapers,beer"),
    (8, 1006, "2024-01-03", "bread,butter,milk"),
    (9, 1007, "2024-01-03", "eggs,bread,milk"),
    (10, 1008, "2024-01-04", "diapers,beer,milk"),
    (11, 1009, "2024-01-04", "bread,milk,butter,eggs"),
    (12, 1010, "2024-01-04", "beer,diapers,milk,bread"),
    (13, 1011, "2024-01-05", "bread,cheese,wine"),
    (14, 1012, "2024-01-05", "milk,bread,eggs,butter"),
    (15, 1013, "2024-01-05", "diapers,beer,bread"),
]

# 创建DataFrame
df = spark.createDataFrame(data, ["transaction_id", "customer_id", "date", "items_str"])
df = df.withColumn("items", split(col("items_str"), ","))

print("=" * 80)
print("FP-Growth 算法扩展指标分析")
print("=" * 80)

print("
1. 运行标准 FP-Growth 算法")
print("-" * 40)

# 运行FP-Growth
fp = FPGrowth(itemsCol="items", minSupport=0.2, minConfidence=0.3)
model = fp.fit(df)

# 获取频繁项集和规则
freq_itemsets = model.freqItemsets
association_rules = model.associationRules

print(f"发现 {freq_itemsets.count()} 个频繁项集")
print(f"发现 {association_rules.count()} 条关联规则")

print("
2. 计算扩展指标")
print("-" * 40)

# 创建支持度字典以便快速查找
support_dict = {}
for row in freq_itemsets.collect():
    # 将列表转换为元组以便哈希
    itemset_key = tuple(sorted(row['items']))
    support_dict[itemset_key] = row['freq'] / df.count()

# 计算 certainty 和 conviction 的自定义函数
def calculate_extended_metrics(antecedent, consequent, confidence, lift):
    """
    计算扩展指标
    """
    # 查找支持度
    antecedent_key = tuple(sorted(antecedent))
    consequent_key = tuple(sorted(consequent))
    union_key = tuple(sorted(set(antecedent).union(consequent)))

    support_antecedent = support_dict.get(antecedent_key, 0)
    support_consequent = support_dict.get(consequent_key, 0)
    support_union = support_dict.get(union_key, 0)

    # 计算反向置信度
    if support_consequent > 0:
        confidence_reverse = support_union / support_consequent
    else:
        confidence_reverse = 0

    # 1. Certainty (确定性)
    certainty = (confidence + confidence_reverse) / 2 if (confidence + confidence_reverse) > 0 else 0

    # 2. Conviction (确信度)
    if confidence == 1:
        conviction = float('inf')  # 无穷大
    else:
        conviction = (1 - support_consequent) / (1 - confidence) if confidence < 1 else float('inf')

    # 3. Leverage (杠杆率)
    leverage = support_union - (support_antecedent * support_consequent)

    # 4. Coverage (覆盖度)
    coverage = support_antecedent

    return {
        'certainty': round(certainty, 4),
        'conviction': round(conviction, 4) if conviction != float('inf') else 'INF',
        'leverage': round(leverage, 4),
        'coverage': round(coverage, 4),
        'confidence_reverse': round(confidence_reverse, 4)
    }

# 应用扩展指标计算
from pyspark.sql import Row
from pyspark.sql.types import FloatType

# 注册UDF来计算扩展指标
def calculate_metrics_udf(antecedent, consequent, confidence, lift):
    metrics = calculate_extended_metrics(list(antecedent), list(consequent), confidence, lift)
    return [metrics['certainty'], 
            float(metrics['conviction']) if metrics['conviction'] != 'INF' else float('inf'),
            metrics['leverage'],
            metrics['coverage'],
            metrics['confidence_reverse']]

# 定义UDF返回类型
from pyspark.sql.types import ArrayType

calculate_metrics_udf_spark = udf(calculate_metrics_udf, ArrayType(FloatType()))

# 添加扩展指标到规则
rules_with_metrics = association_rules.withColumn(
    "extended_metrics",
    calculate_metrics_udf_spark("antecedent", "consequent", "confidence", "lift")
)

# 拆分扩展指标到单独的列
rules_extended = rules_with_metrics.select(
    "antecedent",
    "consequent",
    "confidence",
    "lift",
    (col("extended_metrics")[0]).alias("certainty"),
    (col("extended_metrics")[1]).alias("conviction"),
    (col("extended_metrics")[2]).alias("leverage"),
    (col("extended_metrics")[3]).alias("coverage"),
    (col("extended_metrics")[4]).alias("confidence_reverse")
)

print("
3. 完整指标分析")
print("-" * 40)

# 显示所有指标
print("关联规则完整指标表:")
rules_extended.orderBy("confidence", ascending=False).show(truncate=False)

print("
4. 指标解释和业务意义")
print("-" * 40)

# 收集规则进行分析
rules_list = rules_extended.collect()

if len(rules_list) > 0:
    print("
指标含义说明:")
    print("1. 支持度 (Support): 规则中的商品组合出现的频率")
    print("2. 置信度 (Confidence): 购买A的情况下购买B的概率")
    print("3. 提升度 (Lift): 规则的有效性,>1表明正相关")
    print("4. 确定性 (Certainty): 双向可信度的平均值")
    print("5. 确信度 (Conviction): 规则的反向度量,越高越好")
    print("6. 杠杆率 (Leverage): 实际联合出现与期望的差异")
    print("7. 覆盖度 (Coverage): 前件商品的流行度")

    print("
高质量规则示例:")
    print("-" * 30)

    for i, rule in enumerate(rules_list[:5], 1):
        antecedent = rule['antecedent']
        consequent = rule['consequent']
        confidence = rule['confidence']
        lift = rule['lift']
        certainty = rule['certainty']
        conviction = rule['conviction']
        leverage = rule['leverage']

        print(f"
规则 {i}: {antecedent}{consequent}")
        print(f"  置信度: {confidence:.1%}")
        print(f"  提升度: {lift:.2f}")
        print(f"  确定性: {certainty:.2f}")
        print(f"  确信度: {conviction:.2f}")

        # 业务解释
        if lift > 1 and confidence > 0.5:
            print(f"   强关联规则:购买{antecedent}会显著增加购买{consequent}的可能性")

        if certainty > 0.6:
            print(f"  ✅ 高确定性:这个关联是双向的")

        if conviction > 1.5:
            print(f"   高确信度:规则很少出错")

# 5. 按不同指标排序分析
print("
5. 按不同指标的最佳规则")
print("-" * 40)

# 按置信度排序
print("按置信度排序(最高):")
rules_extended.orderBy("confidence", ascending=False).limit(3).show(truncate=False)

# 按提升度排序
print("
按提升度排序(最高):")
rules_extended.orderBy("lift", ascending=False).limit(3).show(truncate=False)

# 按确定性排序
print("
按确定性排序(最高):")
rules_extended.orderBy("certainty", ascending=False).limit(3).show(truncate=False)

# 按确信度排序(排除无穷大)
print("
按确信度排序(最高,有限值):")
rules_extended.filter(col("conviction") < float('inf')) 
    .orderBy("conviction", ascending=False) 
    .limit(3).show(truncate=False)

# 6. 综合评分系统
print("
6. 综合评分系统")
print("-" * 40)

# 计算综合得分
def calculate_composite_score(confidence, lift, certainty, conviction):
    """计算综合得分"""
    # 标准化权重
    score = (confidence * 0.4 +           # 置信度权重40%
             min(lift, 5) / 5 * 0.3 +    # 提升度权重30%(限制最大5)
             certainty * 0.2 +           # 确定性权重20%
             min(conviction, 10) / 10 * 0.1)  # 确信度权重10%(限制最大10)
    return score

# 注册UDF
from pyspark.sql.types import DoubleType
composite_score_udf = udf(calculate_composite_score, DoubleType())

rules_scored = rules_extended.withColumn(
    "composite_score",
    composite_score_udf("confidence", "lift", "certainty", "conviction")
)

print("综合评分最高的规则:")
rules_scored.orderBy("composite_score", ascending=False).limit(5).show(truncate=False)

# 7. 高级分析:规则分类
print("
7. 规则分类分析")
print("-" * 40)

# 根据指标分类规则
def classify_rule(confidence, lift, certainty):
    """分类规则"""
    if confidence > 0.7 and lift > 2:
        return "强推荐规则"
    elif confidence > 0.5 and lift > 1.5:
        return "推荐规则"
    elif confidence > 0.3 and lift > 1:
        return "一般规则"
    else:
        return "弱规则"

classify_udf = udf(classify_rule, StringType())

rules_classified = rules_scored.withColumn(
    "rule_category",
    classify_udf("confidence", "lift", "certainty")
)

# 统计各类规则数量
category_stats = rules_classified.groupBy("rule_category").count().orderBy("count", ascending=False)
print("规则分类统计:")
category_stats.show()

# 8. 可视化分析
print("
8. 指标相关性分析")
print("-" * 40)

try:
    import matplotlib.pyplot as plt
    import seaborn as sns
    import numpy as np

    # 转换为Pandas DataFrame进行分析
    rules_pd = rules_extended.toPandas()

    # 替换无穷大为大数值
    rules_pd['conviction'] = rules_pd['conviction'].replace([float('inf')], 100)
    rules_pd['conviction'] = rules_pd['conviction'].clip(upper=10)  # 限制最大值

    # 创建相关性矩阵
    metrics_columns = ['confidence', 'lift', 'certainty', 'conviction', 'leverage', 'coverage']
    correlation_matrix = rules_pd[metrics_columns].corr()

    fig, axes = plt.subplots(2, 3, figsize=(15, 10))

    # 1. 置信度分布
    axes[0, 0].hist(rules_pd['confidence'], bins=10, edgecolor='black', alpha=0.7)
    axes[0, 0].set_xlabel('置信度')
    axes[0, 0].set_ylabel('规则数量')
    axes[0, 0].set_title('置信度分布')

    # 2. 提升度分布
    axes[0, 1].hist(rules_pd['lift'], bins=10, edgecolor='black', alpha=0.7, color='green')
    axes[0, 1].set_xlabel('提升度')
    axes[0, 1].set_ylabel('规则数量')
    axes[0, 1].set_title('提升度分布')

    # 3. 确定性分布
    axes[0, 2].hist(rules_pd['certainty'], bins=10, edgecolor='black', alpha=0.7, color='orange')
    axes[0, 2].set_xlabel('确定性')
    axes[0, 2].set_ylabel('规则数量')
    axes[0, 2].set_title('确定性分布')

    # 4. 确信度分布
    axes[1, 0].hist(rules_pd['conviction'], bins=10, edgecolor='black', alpha=0.7, color='red')
    axes[1, 0].set_xlabel('确信度')
    axes[1, 0].set_ylabel('规则数量')
    axes[1, 0].set_title('确信度分布')

    # 5. 置信度 vs 提升度散点图
    axes[1, 1].scatter(rules_pd['confidence'], rules_pd['lift'], alpha=0.6)
    axes[1, 1].set_xlabel('置信度')
    axes[1, 1].set_ylabel('提升度')
    axes[1, 1].set_title('置信度 vs 提升度')

    # 6. 相关性热图
    im = axes[1, 2].imshow(correlation_matrix.values, cmap='coolwarm', vmin=-1, vmax=1)
    axes[1, 2].set_xticks(range(len(metrics_columns)))
    axes[1, 2].set_yticks(range(len(metrics_columns)))
    axes[1, 2].set_xticklabels(metrics_columns, rotation=45)
    axes[1, 2].set_yticklabels(metrics_columns)
    axes[1, 2].set_title('指标相关性热图')

    # 添加颜色条
    plt.colorbar(im, ax=axes[1, 2])

    plt.tight_layout()
    plt.savefig('fp_growth_extended_metrics.png', dpi=150, bbox_inches='tight')
    print("图表已保存为 'fp_growth_extended_metrics.png'")

    # 显示相关性
    print("
指标相关性矩阵:")
    print(correlation_matrix.round(2))

except ImportError:
    print("需要安装matplotlib和seaborn进行可视化")
    print("安装命令: pip install matplotlib seaborn")

# 9. 保存完整结果
print("
9. 保存分析结果")
print("-" * 40)

# 保存带有所有指标的规则
rules_scored.coalesce(1).write.csv(
    "fp_growth_complete_rules.csv",
    header=True,
    mode="overwrite"
)
print("✓ 完整规则已保存到 fp_growth_complete_rules.csv")

# 生成报告摘要
print("
" + "=" * 80)
print("分析报告摘要")
print("=" * 80)

total_rules = rules_extended.count()
high_confidence_rules = rules_extended.filter(col("confidence") > 0.7).count()
high_lift_rules = rules_extended.filter(col("lift") > 2).count()
high_certainty_rules = rules_extended.filter(col("certainty") > 0.6).count()

print(f"总规则数: {total_rules}")
print(f"高置信度规则 (>70%): {high_confidence_rules} ({high_confidence_rules/total_rules:.1%})")
print(f"高提升度规则 (>2): {high_lift_rules} ({high_lift_rules/total_rules:.1%})")
print(f"高确定性规则 (>0.6): {high_certainty_rules} ({high_certainty_rules/total_rules:.1%})")

# 业务提议
print("
业务提议:")
print("1. 关注置信度和提升度都高的规则进行营销")
print("2. 确定性高的规则适合双向推荐")
print("3. 确信度高的规则更可靠,适合作为核心策略")

# 关闭Spark
spark.stop()
print("
" + "=" * 80)
print("分析完成!")
print("=" * 80)

简化版本:仅计算 certainty 和 conviction

import sys
import os
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

from pyspark.sql import SparkSession
from pyspark.ml.fpm import FPGrowth
from pyspark.sql.functions import split, col, udf
from pyspark.sql.types import DoubleType

spark = SparkSession.builder.master("local").getOrCreate()

# 简单数据
data = [(1, "milk,bread"), (2, "bread,beer"), (3, "milk,bread,beer"), 
        (4, "bread,butter"), (5, "milk,bread,butter")]

df = spark.createDataFrame(data, ["id", "items"])
df = df.withColumn("items", split("items", ","))

# 运行FP-Growth
fp = FPGrowth(itemsCol="items", minSupport=0.3, minConfidence=0.5)
model = fp.fit(df)

# 获取支持度信息
total_transactions = df.count()
support_dict = {}
for row in model.freqItemsets.collect():
    itemset_key = tuple(sorted(row['items']))
    support_dict[itemset_key] = row['freq'] / total_transactions

# 计算certainty和conviction的函数
def calc_certainty_conviction(antecedent, consequent, confidence):
    antecedent_key = tuple(sorted(antecedent))
    consequent_key = tuple(sorted(consequent))
    union_key = tuple(sorted(set(antecedent).union(consequent)))

    support_antecedent = support_dict.get(antecedent_key, 0)
    support_consequent = support_dict.get(consequent_key, 0)
    support_union = support_dict.get(union_key, 0)

    # Certainty (确定性)
    if support_consequent > 0:
        confidence_reverse = support_union / support_consequent
    else:
        confidence_reverse = 0
    certainty = (confidence + confidence_reverse) / 2

    # Conviction (确信度)
    if confidence == 1:
        conviction = float('inf')
    else:
        conviction = (1 - support_consequent) / (1 - confidence) if confidence < 1 else float('inf')

    return (float(certainty), float(conviction) if conviction != float('inf') else float('inf'))

# 注册UDF
calc_metrics_udf = udf(calc_certainty_conviction, 
                       returnType=ArrayType(DoubleType()))

# 添加指标
rules_with_extra = model.associationRules.withColumn(
    "extra_metrics",
    calc_metrics_udf("antecedent", "consequent", "confidence")
).select(
    "antecedent",
    "consequent",
    "confidence",
    "lift",
    col("extra_metrics")[0].alias("certainty"),
    col("extra_metrics")[1].alias("conviction")
)

print("带有certainty和conviction指标的规则:")
rules_with_extra.show(truncate=False)

spark.stop()

各指标详细说明

1.Certainty (确定性)

certainty = (confidence(A→B) + confidence(B→A)) / 2
  • 取值范围: 0-1
  • 意义: 衡量规则的双向可信度
  • 判断标准:
    • 0.7: 强确定性
    • 0.5-0.7: 中等确定性
    • <0.5: 弱确定性

2.Conviction (确信度)

conviction = (1 - support(B)) / (1 - confidence(A→B))
  • 取值范围: 0-∞
  • 意义: 衡量规则预测错误的频率
  • 判断标准:
    • ∞: 完美规则(从不错误)
    • 1.5: 高质量规则
    • 1.0: 与随机无差异
    • <1.0: 规则比随机猜测还差

3.其他可用指标

# Leverage (杠杆率)
leverage = support(A∪B) - support(A) × support(B)

# Coverage (覆盖度)
coverage = support(A)

# Added Value (附加值)
added_value = confidence(A→B) - support(B)

总结

虽然 Spark 的 FP-Growth 默认不提供 certainty 和 conviction 指标,但我们可以基于已有的支持度、置信度信息轻松计算出来。这些扩展指标能提供更全面的规则评估,协助你做出更好的业务决策!

© 版权声明

相关文章

暂无评论

none
暂无评论...