MENU

手撕代码(注意力+评价指标+损失函数)

March 23, 2025 • Read: 7 • 动手学推荐系统阅读设置

MultiHeadAttention
评价指标:AUC, GAUC, Precision, Recall, NDCG, HR, F1,MAP, MRR
损失函数:MAE, MSE,BCE Loss, Softmax Loss, BPR Loss, Contrastive Loss, Triplet Loss, NCE Loss, InfoNCE Loss, KL Loss

MultiHeadAttention

import torch
import torch.nn as nn
import torch.nn.functional as F

class MultiHeadAttention(nn.Module):
    def __init__(self, embed_dim, num_heads, dropout_rate=0.1):
        super(MultiHeadAttention, self).__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        assert self.embed_dim % self.num_heads == 0
        self.head_dim = self.embed_dim // self.num_heads
        
        self.Wq = nn.Linear(self.embed_dim, self.embed_dim, bias=False)
        self.Wk = nn.Linear(self.embed_dim, self.embed_dim, bias=False) 
        self.Wv = nn.Linear(self.embed_dim, self.embed_dim, bias=False)
        
        self.dropout = nn.Dropout(dropout_rate)
        
        
    def forward(self, q, k, v, mask=None):
        bs, seq_len, dim = q.shape
        assert dim == self.embed_dim
        q = self.Wq(q).view(bs, -1, self.num_heads, self.head_dim).transpose(1, 2) # bs, num_heads, seq_len, head_dim
        k = self.Wk(k).view(bs, -1, self.num_heads, self.head_dim).transpose(1, 2)
        v = self.Wv(v).view(bs, -1, self.num_heads, self.head_dim).transpose(1, 2)
        
        scores = torch.matmul(q, k.transpose(-1, -2)) / (self.head_dim ** 0.5) # bs, num_heads, seq_len, seq_len
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        attention_weights = F.softmax(scores, dim=-1)
        attention_weights = self.dropout(attention_weights)
        
        output = torch.matmul(attention_weights, v).transpose(1, 2).contiguous().view(bs, seq_len, self.embed_dim)
        
        return output # bs, seq_len, embed_dim
def test():
    m = MultiHeadAttention(100, 2)
    q = torch.randn(100, 3, 100)
    o = m(q, q, q)
    print(o)

AUC and GAUC

真值/预测值正例负例
正例TPFN
负例FPTN

$$ TPR = \frac{TP}{TP + FN}, FPR = \frac{FP}{TN + FP} $$

AUC的定义

  • ROC曲线的面积(TPattention, auc, gauc,din,youtubednn,precision, recall,ndcg, hr, f1, map, mrr, ce loss, nce loss, bpr loss
  • 随机选出一对正负样本,正例预测值大于负例预测值的概率
import numpy as np
def cal_auc(y_true, y_pred):
    fz, fm = 0, 0
    for i in range(0, len(y_true) - 1):
        for j in range(i + 1, len(y_true)):
            if y_true[i] != y_true[j]:
                fm += 1
                if (y_true[i] > y_true[j] and y_pred[i] > y_pred[j]) or (y_true[i] < y_true[j] and y_pred[i] < y_pred[j]):
                    fz += 1
    return fz / fm
def cal_auc2(y_true, y_pred):
    sort_indices = np.argsort(y_pred)[::-1]
    y_true_sorted = y_true[sort_indices]
    tp, fp = 0, 0
    auc = 0
    prev_fpr, prev_tpr = 0, 0
    n = len(y_true)
    realTrueNum = sum(y_true)
    for pred_label in y_true_sorted:
        if pred_label == 1:
            tp += 1
        else:
            fp += 1
        tpr = tp / realTrueNum
        fpr = fp / (n - realTrueNum)
        if tp > 0 and fp > 0:
            auc += (fpr - prev_fpr) * (prev_tpr + tpr) / 2
            prev_fpr = fpr
            prev_tpr = tpr
    return auc


def cal_guac(y_true, y_pred, group_ids, weights):
    groups = np.unique(group_ids)
    total_auc = 0
    total_weight = 0
    assert len(weights) == len(groups)
    for g in groups:
        mask = group_ids == g
        y_true_g = y_true[mask]
        y_perd_g = y_pred[mask]
        # 跳过无效分组
        if len(np.unique(y_true_g)) < 2:
            continue
        auc_g = cal_auc(y_true_g, y_perd_g)
        total_auc += auc_g * weights[g]
        total_weight += weights[g]
    
    return total_auc / total_weight if total_weight != 0 else 0
        
        
from sklearn.metrics import roc_auc_score   
  
def test_auc():
    y_true = np.array([1,   0,   0,   0,   1,    0,   1,    0,    0,   1  ])
    y_pred = np.array([0.9, 0.4, 0.3, 0.1, 0.35, 0.6, 0.65, 0.32, 0.8, 0.7])
    groups = np.array([0, 0, 0, 1, 1, 1, 2, 2, 2, 2])
    weights = {0:1, 1:1, 2:1}
    auc1 = cal_auc(y_true, y_pred)
    auc2 = cal_auc2(y_true, y_pred)
    auc3 = roc_auc_score(y_true, y_pred)
    gauc = cal_guac(y_true, y_pred, groups, weights)
    print(auc1, auc2, auc3, gauc)

Recall, Precision, NDCG, HR, F1, MAP, MRR

$$ Recall@K = \frac{TP@K}{TP@K + FN@K} $$

$$ Precsion@K = \frac{TP@K}{TP@K + FP@K} $$

$$ NDCG@K=\frac{DCG@K}{IDCG@K}, DCG@K=\sum_{i=1}^K\frac{rel_i}{log_2(i+1)}, IDCG@K=\sum_{i=1}^K\frac{rel_i^{sorted}}{log_2(i+1)} $$

$$ HR@K= 1\quad if\quad TP>0 \quad else \quad 0 $$

$$ F1@K = 2 \cdot \frac{Precision@K \cdot Recall@K}{Precision@K + Recall@K} $$

$$ MAP@K = \frac{1}{|Q|}\sum_{q=1}^{|Q|}AP@K(q), AP@K(q)=\frac{1}{min(K, TP@K + FN@K)}\sum_{k=1}^K Precsion@k \cdot rel_k $$

$$ MRR@K = \frac{1}{|Q|}\sum_{q=1}^{|Q|}RR@K(q), RR@K(q) = \frac{1}{rank_q}\quad if \quad rank_q\le K\quad else \quad 0 $$

def cal_recall_at_k(y_true, y_pred, k=10):
    top_k = np.argsort(y_pred)[::-1][:k]
    hits = y_true[top_k]
    total_positive = np.sum(y_true)
    return np.sum(hits) / total_positive if total_positive != 0 else 0
def cal_precision_at_k(y_true, y_pred, k=10):
    top_k = np.argsort(y_pred)[::-1][:k]
    hits = y_true[top_k]
    return np.sum(hits) / k

def cal_dcg_at_k(y_true, y_pred, k=10):
    order = np.argsort(y_pred)[::-1]
    y_true_sorted = y_true[order]
    discounts = 1 / np.log2(np.arange(2, k + 2))
    dcg = np.sum(y_true_sorted[:k] * discounts)
    return dcg

def cal_ndcg_at_k(y_true, y_pred, k=10):
    dcg = cal_dcg_at_k(y_true, y_pred, k)
    ideal_y_true = np.sort(y_true)[::-1]
    idcg = cal_dcg_at_k(ideal_y_true, ideal_y_true, k)
    return dcg / idcg if idcg else 0

def cal_hit_rate_at_k(y_true, y_pred, k=10):
    top_k = np.argsort(y_pred)[::-1][:k]
    hits = y_true[top_k]
    return 1 if np.sum(hits) else 0

def cal_f1_at_k(y_true, y_pred, k=10):
    precision = cal_precision_at_k(y_true, y_pred, k)
    recall = cal_recall_at_k(y_true, y_pred, k)
    return 2 * (precision * recall) / (precision + recall)

def cal_average_precision_at_k(y_true, y_pred, k=10):
    ap = 0
    total_positives = np.sum(y_true)
    if total_positives == 0:
        return 0
    for i in range(1, k + 1):
        precision = cal_precision_at_k(y_true, y_pred, i)
        rel = y_true[np.argsort(y_pred)[::-1][i-1]] # 第i个物品是否相关
        ap += precision * rel
    return ap / min(k, total_positives)

def cal_receiprocal_rank_at_k(y_true, y_pred, k=10):
    rank = np.argsort(y_pred)[::-1]
    for i, idx in enumerate(rank):
        if y_true[idx] == 1:
            if i + 1 <= k:
                return 1 / (i + 1)
            else:
                return 0
    return 0

def test_metrics():
    y_true = np.array([1, 0, 0, 1, 0, 0, 0, 1, 0, 0])
    y_pred = np.array([0.9, 0.2, 0.3, 0.8, 0.1, 0.4, 0.5, 0.7, 0.6, 0.1])
    recall = cal_recall_at_k(y_true, y_pred)
    precision = cal_precision_at_k(y_true, y_pred)
    ndcg = cal_ndcg_at_k(y_true, y_pred)
    hr = cal_hit_rate_at_k(y_true, y_pred)
    f1 = cal_f1_at_k(y_true, y_pred)
    ap = cal_average_precision_at_k(y_true, y_pred)
    rr = cal_receiprocal_rank_at_k(y_true, y_pred)
    print(recall, precision, ndcg, hr, f1, ap, rr)

回归损失:MAE, MSE

$$ MAE = \frac{1}{N}\sum_{i=1}^N|y_i - \hat{y_i}| $$

$$ MSE = \frac{1}{N}\sum_{i=1}^N(y_i - \hat{y_i})^2 $$

def cal_mean_absolute_error(y_true, y_pred):
    return np.mean(np.abs(y_true - y_pred))

def cal_mean_squared_error(y_true, y_pred):
    return np.mean((y_true - y_pred) ** 2)

def test_mae_and_mse():
    y_true = np.array([1, 0, 0, 1, 0, 0, 0, 1, 0, 0])
    y_pred = np.array([0.9, 0.2, 0.3, 0.8, 0.1, 0.4, 0.5, 0.7, 0.6, 0.1])
    mae = cal_mean_absolute_error(y_true, y_pred)
    mse = cal_mean_squared_error(y_true, y_pred)
    print(mae, mse)

分类损失:BCE Loss, Softmax Loss

$$ BCE = \frac{1}{N}\sum_{i=1}^N(y_ilog(\hat{y_i})+ (1-y_i)log(1-\hat{y_i})) $$

$$ Softmax \quad Loss = \frac{1}{N}\sum_{i=1}^Nlog(\frac{e^{f_{y_i}}}{\sum_{j=1}^Ce^{f_j}}) $$

def cal_binary_cross_entropy_loss(y_true, y_pred):
    epsilon = 1e-15
    y_pred = np.clip(y_pred, epsilon, 1 - epsilon)
    return -np.mean(y_true * np.log(y_pred) + (1 - y_true) * np.log(1 - y_pred))

def cal_softmax_loss(y_true, y_pred):
    y_true_one_hot = np.zeros_like(y_pred)
    y_true_one_hot[np.arange(len(y_true)), y_true] = 1
    exp_scores = np.exp(y_pred)
    softmax_probs = exp_scores / np.sum(exp_scores , axis=1, keepdims=True)
    epsilon = 1e-15
    softmax_probs = np.clip(softmax_probs, epsilon, 1 - epsilon)
    return -np.mean(np.sum(y_true_one_hot * np.log(softmax_probs), axis=1))

def test_bce_and_softmax():
    y_true_bce = np.array([0, 1, 1])
    y_pred_bce = np.array([0.1, 0.9, 0.8])
    y_true_softmax = np.array([0, 2, 1])
    y_pred_softmax = np.array([[0.1, 0.7, 0.2],
                              [0.3, 0.4, 0.3],
                              [0.2, 0.5, 0.3]])
    bce_loss = cal_binary_cross_entropy_loss(y_true_bce, y_pred_bce)
    softmax_loss = cal_softmax_loss(y_true_softmax, y_pred_softmax)
    print(bce_loss, softmax_loss)

排序损失:BPR

$$ BPR = -\frac{1}{N}\sum_{(u, i, j))\in D}log\sigma(\hat{y}_{ui}-\hat{y}_{uj}) $$

def cal_bpr_loss(y_ui, y_uj):
    diff = y_ui - y_uj
    sigmoid_value = 1 / (1 + np.exp(-diff))
    bpr = -np.log(sigmoid_value)
    return bpr

对比损失:Contrastive Loss,Triplet Loss, NCE Loss, InfoNCE Loss

$$ Constrastive\quad Loss = \frac{1}{2N}\sum_{i=1}^N(y_id_i^2 + (1-y_i)max(0, m -d_i)^2) $$

$$ Triplet \quad Loss = \frac{1}{N}\sum_{i=1}^Nmax(0, d(a_i, p_i) - d(a_i, n_i) + m) $$

$$ NCE \quad Loss = -\frac{1}{N}\sum_{i=1}^N(log\sigma(f(x_i, y_i))+\sum_{j=1}^Klog\sigma(-f(x_i, y_j))) $$

$$ InfoNCE \quad Loss = -\frac{1}{N}\sum_{i=1}^Nlog(\frac{e^{f(x_i, y_i)}}{\sum_{j=1}^Ne^{f(x_i, y_)}}) $$

def cal_contrastive_loss(y_true, d, margin=1.0):
    loss = y_true * d ** 2 + (1 - y_true) * np.maximum(0, margin - d) ** 2
    return np.mean(loss) / 2 

def cal_triplet_loss(d_pos, d_neg, margin=1.0):
    loss = np.maximum(0, d_pos - d_neg + margin)
    return np.mean(loss)

def cal_nce_loss(f_pos, f_neg):
    pos_loss = np.log(1 / (1 + np.exp(-f_pos)))
    neg_loss = np.sum(np.log(1 / (1 + np.exp(-f_neg))), axis=1)
    return -np.mean(pos_loss + neg_loss)

def cal_infonce_loss(f_pos, f_neg):
    logits = np.concatenate([f_pos.reshape(-1, 1), f_neg], axis=1)
    exp_logits = np.exp(logits)
    loss = -np.log(exp_logits[:, 0] / np.sum(exp_logits, axis=1))
    return np.mean(loss)

def test_contrastive_loss():
    y_true_contrastive = np.array([1, 0, 1, 0])
    d_contrastive = np.array([0.5, 1.5, 0.8, 1.2]) 
    d_pos_triplet = np.array([0.5, 0.8, 0.7])
    d_neg_triplet = np.array([1.5, 1.2, 1.3])
    f_pos_nce = np.array([2.0, 1.5, 1.8])
    f_neg_nce = np.array([[0.5, 0.3], [0.2, 0.4], [0.1, 0.6]])

    contrastive_loss = cal_contrastive_loss(y_true_contrastive, d_contrastive, margin=1.0)
    triplet_loss = cal_triplet_loss(d_pos_triplet, d_neg_triplet, margin=1.0)
    nce_loss = cal_nce_loss(f_pos_nce, f_neg_nce)
    infonce_loss = cal_infonce_loss(f_pos_nce, f_neg_nce)
    print(contrastive_loss, triplet_loss, nce_loss, infonce_loss)

分布匹配损失:KL Loss

$$ KL(P||Q) = \sum_{i=1}^NP(i)log(\frac{P(i)}{Q(i)}) $$

def cal_kl_divergence(p, q):
    epsilon = 1e-15
    p = np.clip(p, epsilon, 1 - epsilon)
    q = np.clip(q, epsilon, 1 - epsilon)
    return np.sum(p * np.log(p  / q))

def test_kl_loss():
    p = np.array([0.1, 0.4, 0.5])
    q = np.array([0.2, 0.3, 0.5])
    kl_loss = cal_kl_divergence(p, q)
    print(kl_loss)
Last Modified: May 8, 2025