一、原理速读(对照官方开源仓库)
整体流程(For You 时间线)
候选召回(Candidate Sources) → 轻排(Light Ranker)/重排(Heavy Ranker) → 混排与过滤(Mixer & Filters)。官方仓库把关键组件与职责写得很清楚:
候选来源:
表征与图特征:
- SimClusters:基于重叠社区的稀疏表征,支撑大规模候选与 ANN 检索。(Venu Satuluri)
- TwHIN:把用户、推文、广告主等放到同一异构图里学稠密向量,线上多场景收益。(arXiv)
- GraphJet:实时图引擎,支撑 UTEG/协同过滤等圈外召回。(VLDB)
排序:
混排与可见性:
- Home Mixer 把多路候选汇总、重排序,并施加多样性/作者去重/负反馈等启发式;Visibility 负责法务与安全相关的过滤与降权。(GitHub)
规模与比例感
官方的 Retrieval Signals 文档明确:候选阶段会把约十亿级内容压缩到几千条供后续排序。(GitHub)
外媒与工程解读常提到“初选约 1500 条,目标 in/out 约 50/50”,这与官方 README 的“~50% 来自 in-network”一致,但具体数值会因用户与策略动态变化(非硬编码)。(GitHub)
小结:你可以把官方开源理解为——圈内回溯 + 圈外探索(图/社区/嵌入) 生成候选,随后 轻重两级排序,最终再做 混排与规则/安全处理。核心名词(Home-Mixer、UTEG/GraphJet、SimClusters、TwHIN、Heavy Ranker)都能在仓库与论文里一一对上号。(GitHub)
二、实战:用 Python 复刻一条可跑的小流水线
目标:在本地数据上搭一条与 X 架构“同构”的教学版——两路召回(圈内/圈外)→ 轻排 → 重排 → 混排与过滤。
说明:这是一份教学参考实现,关注“结构对齐 + 可复用”,而非追求工业级吞吐与延迟。
0) 数据约定
你可以用自己的业务数据按下列 CSV 组织(最小列集合):
users.csv
:user_id, followers_count, following_count
follows.csv
:user_id, followee_id
(关注边)tweets.csv
:tweet_id, author_id, created_at, has_media, has_url
interactions.csv
:user_id, tweet_id, type, ts
其中type ∈ {click, like, retweet, reply}
1) 安装依赖
pip install pandas numpy scikit-learn
注:全程只用到 CPU 友好的 sklearn;如要上深度模型,可把“重排”替换为 PyTorch 版 MLP/双塔。
2) 端到端代码(可直接保存为 mini_twitter_rec.py
运行)
# -*- coding: utf-8 -*-
# 教学版 For-You 推荐流水线:两路召回 → 轻排(LogReg) → 重排(MLPClassifier) → 混排与过滤
import math, time, random
from datetime import datetime, timedelta
import numpy as np, pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn.decomposition import TruncatedSVD
from sklearn.cluster import KMeans
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.linear_model import LogisticRegression
from sklearn.neural_network import MLPClassifier
# -----------------------------
# 读入数据(替换为你的路径)
# -----------------------------
users = pd.read_csv("users.csv")
follows = pd.read_csv("follows.csv")
tweets = pd.read_csv("tweets.csv", parse_dates=["created_at"])
inter = pd.read_csv("interactions.csv", parse_dates=["ts"])
NOW = tweets["created_at"].max() + pd.Timedelta(hours=1)
# -----------------------------
# 一些超参(教学可调)
# -----------------------------
MAX_IN_CAND = 600 # 圈内最大候选数
MAX_OUT_CAND = 900 # 圈外最大候选数
RECENT_DAYS = 14 # 仅召回近 14 天推文
LIGHT_TOPK = 400 # 轻排保留
FINAL_K = 100 # 最终返回条数
AUTHOR_PER_USER_CAP = 3 # 单作者上限(多样性)
TARGET_IN_RATIO = 0.5 # 目标圈内比例
# 互动权重(训练标签/样本权重参考 Heavy Ranker 的“加权参与”思想,真实权重非硬编码)
ENG_W = {"click": 0.5, "like": 1.0, "retweet": 1.4, "reply": 1.6}
# -----------------------------
# 特征准备:用户-推文交互矩阵 → SVD 嵌入(TwHIN/SimClusters 的教学替身)
# -----------------------------
# 1) 构造 user-tweet 稀疏矩阵(只要有没有互动)
ui = inter.copy()
ui["w"] = ui["type"].map(ENG_W).fillna(0.3) # 不识别的互动给一个小权重
ui_last = ui.sort_values("ts").drop_duplicates(["user_id","tweet_id"], keep="last")
user_index = {u:i for i,u in enumerate(ui_last["user_id"].unique())}
tweet_index = {t:i for i,t in enumerate(ui_last["tweet_id"].unique())}
U, T = len(user_index), len(tweet_index)
X = np.zeros((U, T), dtype=np.float32)
for _, row in ui_last.iterrows():
X[user_index[row.user_id], tweet_index[row.tweet_id]] = 1.0
# 2) SVD 得到用户/推文嵌入
EMB_DIM = 64
if min(U,T) < EMB_DIM: EMB_DIM = max(8, min(U,T)-1)
svd = TruncatedSVD(n_components=EMB_DIM, random_state=42)
U_emb = svd.fit_transform(X) # 用户嵌入
T_emb = svd.components_.T # 推文嵌入(近似)
# 为作者构造嵌入(推文嵌入求平均)
tweet2author = tweets.set_index("tweet_id")["author_id"].to_dict()
author_list = tweets["author_id"].unique().tolist()
author_index = {a:i for i,a in enumerate(author_list)}
A = len(author_index)
A_emb = np.zeros((A, EMB_DIM), dtype=np.float32)
A_cnt = np.zeros(A, dtype=np.int32)
for tid, j in tweet_index.items():
a = tweet2author.get(tid, None)
if a in author_index:
A_emb[author_index[a]] += T_emb[j]
A_cnt[author_index[a]] += 1
A_emb = A_emb / np.maximum(1, A_cnt)[:,None]
# 3) SimClusters-lite:对作者嵌入做 kmeans 形成“社区”并赋予稀疏 membership
K = max(20, int(math.sqrt(A)))
kmeans = KMeans(n_clusters=K, n_init=5, random_state=42).fit(A_emb)
author_cluster = {author_list[i]: int(c) for i,c in enumerate(kmeans.labels_)}
# -----------------------------
# 召回
# -----------------------------
tweets_recent = tweets[tweets["created_at"] >= (NOW - pd.Timedelta(days=RECENT_DAYS))].copy()
tweets_recent = tweets_recent[~tweets_recent["author_id"].isna()]
# 构造辅助索引
followees = follows.groupby("user_id")["followee_id"].apply(set).to_dict()
tweet_by_author = tweets_recent.groupby("author_id")["tweet_id"].apply(list).to_dict()
eng_by_tweet = inter.groupby("tweet_id")["user_id"].apply(set).to_dict()
user_followees = lambda u: followees.get(u, set())
user_seen = inter.groupby("user_id")["tweet_id"].apply(set).to_dict()
def recall_in_network(u):
"""从关注的人中拿最近推文(按新鲜度与‘社交证明’粗排)"""
fs = list(user_followees(u))
cand = []
for a in fs:
for t in tweet_by_author.get(a, []):
# 社交证明:我关注的人里有多少人也与这条互动
sp = len(eng_by_tweet.get(t, set()) & set(fs))
age_h = (NOW - tweets_recent.loc[tweets_recent["tweet_id"]==t,"created_at"].iloc[0]).total_seconds()/3600.0
cand.append((t, 1.0/(1.0+age_h), sp))
cand.sort(key=lambda x: (x[1], x[2]), reverse=True)
return [c[0] for c in cand[:MAX_IN_CAND]]
def recall_out_network(u):
"""圈外两路:作者相似 + 社区相邻"""
# 取用户最近互动的作者
his = inter[inter["user_id"]==u].merge(tweets[["tweet_id","author_id"]], on="tweet_id", how="left")
top_authors = his["author_id"].value_counts().index.tolist()[:30]
# 作者相似度(cosine)
ca = []
for a in top_authors:
if a not in author_index: continue
a_vec = A_emb[author_index[a]].reshape(1,-1)
sims = cosine_similarity(a_vec, A_emb)[0]
top_idx = np.argsort(-sims)[:200] # 每个种子作者取相似作者
for j in top_idx:
sim_a = author_list[j]
if sim_a in user_followees(u): continue
for t in tweet_by_author.get(sim_a, []):
ca.append((t, float(sims[j])))
# 社区相邻:取“我常看社区”的邻近社区作者
my_clusters = [author_cluster.get(a, -1) for a in top_authors]
hot_clusters = pd.Series(my_clusters).value_counts().index.tolist()[:5]
for a in author_list:
if author_cluster.get(a,-1) in hot_clusters and a not in user_followees(u):
for t in tweet_by_author.get(a, []):
ca.append((t, 0.3)) # 社区弱特征给个常数相似度
# 粗排去重
seen = set()
out = []
for t, s in sorted(ca, key=lambda x: x[1], reverse=True):
if t in seen: continue
seen.add(t); out.append(t)
if len(out)>=MAX_OUT_CAND: break
return out
# -----------------------------
# 特征工程(供轻排/重排)
# -----------------------------
def build_features(u, cand_ids, label_from_interactions=True):
seen = user_seen.get(u, set())
fs = user_followees(u)
rows = []
for t in cand_ids:
row = {"user_id":u, "tweet_id":t}
tw = tweets_recent[tweets_recent["tweet_id"]==t].iloc[0]
a = tw.author_id
# 基础特征
row["is_in_network"] = 1 if a in fs else 0
row["recency_h"] = (NOW - tw.created_at).total_seconds()/3600.0
row["author_reputation"] = float(users.set_index("user_id").loc[a, "followers_count"]) if a in users["user_id"].values else 0.0
row["has_media"] = int(tw.has_media); row["has_url"] = int(tw.has_url)
# 嵌入相似度
u_idx = user_index.get(u, None); t_idx = tweet_index.get(t, None)
if (u_idx is not None) and (t_idx is not None):
row["emb_sim"] = float(np.dot(U_emb[u_idx], T_emb[t_idx]) / (1e-6 + np.linalg.norm(U_emb[u_idx])*np.linalg.norm(T_emb[t_idx])))
else:
row["emb_sim"] = 0.0
# 社交证明:我的关注里有多少人与此推文互动
row["social_proof"] = len(eng_by_tweet.get(t, set()) & fs)
# 标签(是否互动过,训练用;线上打分时无标签)
if label_from_interactions:
row["label"] = 1 if t in seen else 0
# 样本权重:若互动类型是 retweet/reply,可额外给权重(此处简单化用 1/0)
row["weight"] = 1.0
rows.append(row)
return pd.DataFrame(rows)
# -----------------------------
# 训练集 / 测试集切分(按时间)
# -----------------------------
split_time = NOW - pd.Timedelta(days=3)
train_tweets = tweets_recent[tweets_recent["created_at"] < split_time]["tweet_id"].tolist()
test_tweets = tweets_recent[tweets_recent["created_at"] >= split_time]["tweet_id"].tolist()
train_users = inter[inter["tweet_id"].isin(train_tweets)]["user_id"].unique().tolist()
test_users = inter[inter["tweet_id"].isin(test_tweets)]["user_id"].unique().tolist()
def make_dataset(user_ids, for_train=True):
dfs = []
for u in user_ids:
cand_in = recall_in_network(u)
cand_out = recall_out_network(u)
cands = cand_in + [t for t in cand_out if t not in cand_in]
df = build_features(u, cands, label_from_interactions=for_train)
dfs.append(df)
return pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame()
train_df = make_dataset(train_users, for_train=True)
test_df = make_dataset(test_users, for_train=False) # 线上打分不带 label
# -----------------------------
# 轻排:Logistic Regression
# -----------------------------
feat_cols = ["is_in_network","recency_h","author_reputation","has_media","has_url","emb_sim","social_proof"]
scaler = MinMaxScaler()
Xtr = scaler.fit_transform(train_df[feat_cols]); ytr = train_df["label"].values
lr = LogisticRegression(max_iter=200, n_jobs=1)
lr.fit(Xtr, ytr)
# 对测试集打分(轻排分)
Xte = scaler.transform(test_df[feat_cols])
test_df["light_score"] = lr.predict_proba(Xte)[:,1]
# -----------------------------
# 重排:MLP(代替 Heavy Ranker 的教学版)
# 训练:只在轻排 Top-K 上“再学习”更细粒度的交互偏好
# -----------------------------
# 先在训练集也拟造 light_score 再做二阶段(真实线上会严格时间分离/样本构造)
train_df["light_score"] = lr.predict_proba(scaler.transform(train_df[feat_cols]))[:,1]
# 用更丰富的二阶段特征(加入 light_score 作为输入)
feat_cols2 = feat_cols + ["light_score"]
mlp = MLPClassifier(hidden_layer_sizes=(64,32), activation="relu", random_state=42, max_iter=20)
mlp.fit(train_df[feat_cols2], ytr)
test_df["heavy_score"] = mlp.predict_proba(test_df[feat_cols2])[:,1]
# -----------------------------
# 混排与过滤(多样性、圈内圈外比例、作者打散、NSFW/重复去除等)
# -----------------------------
# 简化版的可见性与过滤:示范性过滤 URL 过多的内容等(替换成你的规则/安全模型输出)
def visibility_ok(row):
return True # 在此接入你的安全/合规规则输出
def blend_for_user(u, df_u):
# 1) 取轻排 TopN 进入重排域(上面已打 heavy_score)
df_u = df_u.sort_values("light_score", ascending=False).head(LIGHT_TOPK).copy()
# 2) 按 heavy_score 排序,期间做作者均衡与圈内/圈外配比
in_need = int(FINAL_K * TARGET_IN_RATIO)
out_need = FINAL_K - in_need
picked, authors_cnt = [], {}
in_cnt, out_cnt = 0, 0
for _, r in df_u.sort_values("heavy_score", ascending=False).iterrows():
if not visibility_ok(r): continue
t = r.tweet_id
a = tweets_recent.loc[tweets_recent["tweet_id"]==t, "author_id"].iloc[0]
# 作者打散
c = authors_cnt.get(a, 0)
if c >= AUTHOR_PER_USER_CAP: continue
# in/out 配比
if r.is_in_network and in_cnt >= in_need: continue
if (not r.is_in_network) and out_cnt >= out_need: continue
picked.append(r)
authors_cnt[a] = c + 1
in_cnt += int(r.is_in_network); out_cnt += int(not r.is_in_network)
if len(picked) >= FINAL_K: break
return pd.DataFrame(picked)
# 生成测试用户的推荐结果
results = []
for u in test_df["user_id"].unique():
df_u = test_df[test_df["user_id"]==u]
blended = blend_for_user(u, df_u)
blended["rank"] = np.arange(1, len(blended)+1)
results.append(blended[["user_id","tweet_id","rank","heavy_score","is_in_network"]])
rec = pd.concat(results, ignore_index=True) if results else pd.DataFrame()
print(rec.head(20))
3) 如何对齐到你的业务 / 做进阶
- 替换嵌入:把 SVD 替换成你现有的用户/内容向量(句向量、多模态向量),或上 Node2Vec / 双塔。
- 圈外召回:将
recall_out_network
拆成多路(作者近邻、相似主题、社区热点、长尾探索),每路各自限额再合并。 轻排/重排:
- 轻排用 线性/GBDT 更稳健;
- 重排建议上 PyTorch MLP/多任务,配“加权参与”目标(点击/点赞/转发/回复的概率经业务权重加和),这与开源的 Heavy Ranker 思路一致(权重非硬编码,随实验与指标调优)。(GitHub)
- 混排与过滤:引入作者多样性约束、内容去重、话题均衡、负反馈抑制、冷启动保护,以及可见性/安全规则(官方 visibilitylib 就是干这个的)。(GitHub)
- 指标:线下 AUC / NDCG@K / Recall@K;线上 CTR/Like Rate/Reply Rate/会话时长等,多目标看权衡。