MultiHeadAttention
评价指标:AUC, GAUC, Precision, Recall, NDCG, HR, F1,MAP, MRR
损失函数:MAE, MSE,BCE Loss, Softmax Loss, BPR Loss, Contrastive Loss, Triplet Loss, NCE Loss, InfoNCE Loss, KL Loss
MultiHeadAttention
import torch
import torch.nn as nn
import torch.nn.functional as F
class MultiHeadAttention(nn.Module):
def __init__(self, embed_dim, num_heads, dropout_rate=0.1):
super(MultiHeadAttention, self).__init__()
self.embed_dim = embed_dim
self.num_heads = num_heads
assert self.embed_dim % self.num_heads == 0
self.head_dim = self.embed_dim // self.num_heads
self.Wq = nn.Linear(self.embed_dim, self.embed_dim, bias=False)
self.Wk = nn.Linear(self.embed_dim, self.embed_dim, bias=False)
self.Wv = nn.Linear(self.embed_dim, self.embed_dim, bias=False)
self.dropout = nn.Dropout(dropout_rate)
def forward(self, q, k, v, mask=None):
bs, seq_len, dim = q.shape
assert dim == self.embed_dim
q = self.Wq(q).view(bs, -1, self.num_heads, self.head_dim).transpose(1, 2) # bs, num_heads, seq_len, head_dim
k = self.Wk(k).view(bs, -1, self.num_heads, self.head_dim).transpose(1, 2)
v = self.Wv(v).view(bs, -1, self.num_heads, self.head_dim).transpose(1, 2)
scores = torch.matmul(q, k.transpose(-1, -2)) / (self.head_dim ** 0.5) # bs, num_heads, seq_len, seq_len
if mask is not None:
scores = scores.masked_fill(mask == 0, -1e9)
attention_weights = F.softmax(scores, dim=-1)
attention_weights = self.dropout(attention_weights)
output = torch.matmul(attention_weights, v).transpose(1, 2).contiguous().view(bs, seq_len, self.embed_dim)
return output # bs, seq_len, embed_dim
def test():
m = MultiHeadAttention(100, 2)
q = torch.randn(100, 3, 100)
o = m(q, q, q)
print(o)
AUC and GAUC
真值/预测值 | 正例 | 负例 |
---|---|---|
正例 | TP | FN |
负例 | FP | TN |
$$ TPR = \frac{TP}{TP + FN}, FPR = \frac{FP}{TN + FP} $$
AUC的定义
- ROC曲线的面积(TPattention, auc, gauc,din,youtubednn,precision, recall,ndcg, hr, f1, map, mrr, ce loss, nce loss, bpr loss
- 随机选出一对正负样本,正例预测值大于负例预测值的概率
import numpy as np
def cal_auc(y_true, y_pred):
fz, fm = 0, 0
for i in range(0, len(y_true) - 1):
for j in range(i + 1, len(y_true)):
if y_true[i] != y_true[j]:
fm += 1
if (y_true[i] > y_true[j] and y_pred[i] > y_pred[j]) or (y_true[i] < y_true[j] and y_pred[i] < y_pred[j]):
fz += 1
return fz / fm
def cal_auc2(y_true, y_pred):
sort_indices = np.argsort(y_pred)[::-1]
y_true_sorted = y_true[sort_indices]
tp, fp = 0, 0
auc = 0
prev_fpr, prev_tpr = 0, 0
n = len(y_true)
realTrueNum = sum(y_true)
for pred_label in y_true_sorted:
if pred_label == 1:
tp += 1
else:
fp += 1
tpr = tp / realTrueNum
fpr = fp / (n - realTrueNum)
if tp > 0 and fp > 0:
auc += (fpr - prev_fpr) * (prev_tpr + tpr) / 2
prev_fpr = fpr
prev_tpr = tpr
return auc
def cal_guac(y_true, y_pred, group_ids, weights):
groups = np.unique(group_ids)
total_auc = 0
total_weight = 0
assert len(weights) == len(groups)
for g in groups:
mask = group_ids == g
y_true_g = y_true[mask]
y_perd_g = y_pred[mask]
# 跳过无效分组
if len(np.unique(y_true_g)) < 2:
continue
auc_g = cal_auc(y_true_g, y_perd_g)
total_auc += auc_g * weights[g]
total_weight += weights[g]
return total_auc / total_weight if total_weight != 0 else 0
from sklearn.metrics import roc_auc_score
def test_auc():
y_true = np.array([1, 0, 0, 0, 1, 0, 1, 0, 0, 1 ])
y_pred = np.array([0.9, 0.4, 0.3, 0.1, 0.35, 0.6, 0.65, 0.32, 0.8, 0.7])
groups = np.array([0, 0, 0, 1, 1, 1, 2, 2, 2, 2])
weights = {0:1, 1:1, 2:1}
auc1 = cal_auc(y_true, y_pred)
auc2 = cal_auc2(y_true, y_pred)
auc3 = roc_auc_score(y_true, y_pred)
gauc = cal_guac(y_true, y_pred, groups, weights)
print(auc1, auc2, auc3, gauc)
Recall, Precision, NDCG, HR, F1, MAP, MRR
$$ Recall@K = \frac{TP@K}{TP@K + FN@K} $$
$$ Precsion@K = \frac{TP@K}{TP@K + FP@K} $$
$$ NDCG@K=\frac{DCG@K}{IDCG@K}, DCG@K=\sum_{i=1}^K\frac{rel_i}{log_2(i+1)}, IDCG@K=\sum_{i=1}^K\frac{rel_i^{sorted}}{log_2(i+1)} $$
$$ HR@K= 1\quad if\quad TP>0 \quad else \quad 0 $$
$$ F1@K = 2 \cdot \frac{Precision@K \cdot Recall@K}{Precision@K + Recall@K} $$
$$ MAP@K = \frac{1}{|Q|}\sum_{q=1}^{|Q|}AP@K(q), AP@K(q)=\frac{1}{min(K, TP@K + FN@K)}\sum_{k=1}^K Precsion@k \cdot rel_k $$
$$ MRR@K = \frac{1}{|Q|}\sum_{q=1}^{|Q|}RR@K(q), RR@K(q) = \frac{1}{rank_q}\quad if \quad rank_q\le K\quad else \quad 0 $$
def cal_recall_at_k(y_true, y_pred, k=10):
top_k = np.argsort(y_pred)[::-1][:k]
hits = y_true[top_k]
total_positive = np.sum(y_true)
return np.sum(hits) / total_positive if total_positive != 0 else 0
def cal_precision_at_k(y_true, y_pred, k=10):
top_k = np.argsort(y_pred)[::-1][:k]
hits = y_true[top_k]
return np.sum(hits) / k
def cal_dcg_at_k(y_true, y_pred, k=10):
order = np.argsort(y_pred)[::-1]
y_true_sorted = y_true[order]
discounts = 1 / np.log2(np.arange(2, k + 2))
dcg = np.sum(y_true_sorted[:k] * discounts)
return dcg
def cal_ndcg_at_k(y_true, y_pred, k=10):
dcg = cal_dcg_at_k(y_true, y_pred, k)
ideal_y_true = np.sort(y_true)[::-1]
idcg = cal_dcg_at_k(ideal_y_true, ideal_y_true, k)
return dcg / idcg if idcg else 0
def cal_hit_rate_at_k(y_true, y_pred, k=10):
top_k = np.argsort(y_pred)[::-1][:k]
hits = y_true[top_k]
return 1 if np.sum(hits) else 0
def cal_f1_at_k(y_true, y_pred, k=10):
precision = cal_precision_at_k(y_true, y_pred, k)
recall = cal_recall_at_k(y_true, y_pred, k)
return 2 * (precision * recall) / (precision + recall)
def cal_average_precision_at_k(y_true, y_pred, k=10):
ap = 0
total_positives = np.sum(y_true)
if total_positives == 0:
return 0
for i in range(1, k + 1):
precision = cal_precision_at_k(y_true, y_pred, i)
rel = y_true[np.argsort(y_pred)[::-1][i-1]] # 第i个物品是否相关
ap += precision * rel
return ap / min(k, total_positives)
def cal_receiprocal_rank_at_k(y_true, y_pred, k=10):
rank = np.argsort(y_pred)[::-1]
for i, idx in enumerate(rank):
if y_true[idx] == 1:
if i + 1 <= k:
return 1 / (i + 1)
else:
return 0
return 0
def test_metrics():
y_true = np.array([1, 0, 0, 1, 0, 0, 0, 1, 0, 0])
y_pred = np.array([0.9, 0.2, 0.3, 0.8, 0.1, 0.4, 0.5, 0.7, 0.6, 0.1])
recall = cal_recall_at_k(y_true, y_pred)
precision = cal_precision_at_k(y_true, y_pred)
ndcg = cal_ndcg_at_k(y_true, y_pred)
hr = cal_hit_rate_at_k(y_true, y_pred)
f1 = cal_f1_at_k(y_true, y_pred)
ap = cal_average_precision_at_k(y_true, y_pred)
rr = cal_receiprocal_rank_at_k(y_true, y_pred)
print(recall, precision, ndcg, hr, f1, ap, rr)
回归损失:MAE, MSE
$$ MAE = \frac{1}{N}\sum_{i=1}^N|y_i - \hat{y_i}| $$
$$ MSE = \frac{1}{N}\sum_{i=1}^N(y_i - \hat{y_i})^2 $$
def cal_mean_absolute_error(y_true, y_pred):
return np.mean(np.abs(y_true - y_pred))
def cal_mean_squared_error(y_true, y_pred):
return np.mean((y_true - y_pred) ** 2)
def test_mae_and_mse():
y_true = np.array([1, 0, 0, 1, 0, 0, 0, 1, 0, 0])
y_pred = np.array([0.9, 0.2, 0.3, 0.8, 0.1, 0.4, 0.5, 0.7, 0.6, 0.1])
mae = cal_mean_absolute_error(y_true, y_pred)
mse = cal_mean_squared_error(y_true, y_pred)
print(mae, mse)
分类损失:BCE Loss, Softmax Loss
$$ BCE = \frac{1}{N}\sum_{i=1}^N(y_ilog(\hat{y_i})+ (1-y_i)log(1-\hat{y_i})) $$
$$ Softmax \quad Loss = \frac{1}{N}\sum_{i=1}^Nlog(\frac{e^{f_{y_i}}}{\sum_{j=1}^Ce^{f_j}}) $$
def cal_binary_cross_entropy_loss(y_true, y_pred):
epsilon = 1e-15
y_pred = np.clip(y_pred, epsilon, 1 - epsilon)
return -np.mean(y_true * np.log(y_pred) + (1 - y_true) * np.log(1 - y_pred))
def cal_softmax_loss(y_true, y_pred):
y_true_one_hot = np.zeros_like(y_pred)
y_true_one_hot[np.arange(len(y_true)), y_true] = 1
exp_scores = np.exp(y_pred)
softmax_probs = exp_scores / np.sum(exp_scores , axis=1, keepdims=True)
epsilon = 1e-15
softmax_probs = np.clip(softmax_probs, epsilon, 1 - epsilon)
return -np.mean(np.sum(y_true_one_hot * np.log(softmax_probs), axis=1))
def test_bce_and_softmax():
y_true_bce = np.array([0, 1, 1])
y_pred_bce = np.array([0.1, 0.9, 0.8])
y_true_softmax = np.array([0, 2, 1])
y_pred_softmax = np.array([[0.1, 0.7, 0.2],
[0.3, 0.4, 0.3],
[0.2, 0.5, 0.3]])
bce_loss = cal_binary_cross_entropy_loss(y_true_bce, y_pred_bce)
softmax_loss = cal_softmax_loss(y_true_softmax, y_pred_softmax)
print(bce_loss, softmax_loss)
排序损失:BPR
$$ BPR = -\frac{1}{N}\sum_{(u, i, j))\in D}log\sigma(\hat{y}_{ui}-\hat{y}_{uj}) $$
def cal_bpr_loss(y_ui, y_uj):
diff = y_ui - y_uj
sigmoid_value = 1 / (1 + np.exp(-diff))
bpr = -np.log(sigmoid_value)
return bpr
对比损失:Contrastive Loss,Triplet Loss, NCE Loss, InfoNCE Loss
$$ Constrastive\quad Loss = \frac{1}{2N}\sum_{i=1}^N(y_id_i^2 + (1-y_i)max(0, m -d_i)^2) $$
$$ Triplet \quad Loss = \frac{1}{N}\sum_{i=1}^Nmax(0, d(a_i, p_i) - d(a_i, n_i) + m) $$
$$ NCE \quad Loss = -\frac{1}{N}\sum_{i=1}^N(log\sigma(f(x_i, y_i))+\sum_{j=1}^Klog\sigma(-f(x_i, y_j))) $$
$$ InfoNCE \quad Loss = -\frac{1}{N}\sum_{i=1}^Nlog(\frac{e^{f(x_i, y_i)}}{\sum_{j=1}^Ne^{f(x_i, y_)}}) $$
def cal_contrastive_loss(y_true, d, margin=1.0):
loss = y_true * d ** 2 + (1 - y_true) * np.maximum(0, margin - d) ** 2
return np.mean(loss) / 2
def cal_triplet_loss(d_pos, d_neg, margin=1.0):
loss = np.maximum(0, d_pos - d_neg + margin)
return np.mean(loss)
def cal_nce_loss(f_pos, f_neg):
pos_loss = np.log(1 / (1 + np.exp(-f_pos)))
neg_loss = np.sum(np.log(1 / (1 + np.exp(-f_neg))), axis=1)
return -np.mean(pos_loss + neg_loss)
def cal_infonce_loss(f_pos, f_neg):
logits = np.concatenate([f_pos.reshape(-1, 1), f_neg], axis=1)
exp_logits = np.exp(logits)
loss = -np.log(exp_logits[:, 0] / np.sum(exp_logits, axis=1))
return np.mean(loss)
def test_contrastive_loss():
y_true_contrastive = np.array([1, 0, 1, 0])
d_contrastive = np.array([0.5, 1.5, 0.8, 1.2])
d_pos_triplet = np.array([0.5, 0.8, 0.7])
d_neg_triplet = np.array([1.5, 1.2, 1.3])
f_pos_nce = np.array([2.0, 1.5, 1.8])
f_neg_nce = np.array([[0.5, 0.3], [0.2, 0.4], [0.1, 0.6]])
contrastive_loss = cal_contrastive_loss(y_true_contrastive, d_contrastive, margin=1.0)
triplet_loss = cal_triplet_loss(d_pos_triplet, d_neg_triplet, margin=1.0)
nce_loss = cal_nce_loss(f_pos_nce, f_neg_nce)
infonce_loss = cal_infonce_loss(f_pos_nce, f_neg_nce)
print(contrastive_loss, triplet_loss, nce_loss, infonce_loss)
分布匹配损失:KL Loss
$$ KL(P||Q) = \sum_{i=1}^NP(i)log(\frac{P(i)}{Q(i)}) $$
def cal_kl_divergence(p, q):
epsilon = 1e-15
p = np.clip(p, epsilon, 1 - epsilon)
q = np.clip(q, epsilon, 1 - epsilon)
return np.sum(p * np.log(p / q))
def test_kl_loss():
p = np.array([0.1, 0.4, 0.5])
q = np.array([0.2, 0.3, 0.5])
kl_loss = cal_kl_divergence(p, q)
print(kl_loss)