? 优质资源分享 ?
学习路线指引(点击解锁) | 知识定位 | 人群定位 |
---|---|---|
? Python实战微信订餐小程序 ? | 进阶级 | 本课程是python flask+微信小程序的完美结合,从项目搭建到腾讯云部署上线,打造一个全栈订餐系统。 |
?Python量化交易实战? | 入门级 | 手把手带你打造一个易扩展、更安全、效率更高的量化交易系统 |
1. 梯度计算式导出
我们在博客《统计学习:逻辑回归与交叉熵损失(Pytorch实现)》中提到,设www为权值(最后一维为偏置),样本总数为NNN,{(xi,yi)}Ni=1{(xi,yi)}Ni=1{(x_i, y_i)}_{i=1}^N为训练样本集。样本维度为DDD,xi∈RD+1x_i\in \mathbb{R}^{D+1}(最后一维扩充),yi∈{0,1}y_i\in{0, 1}。则逻辑回归的损失函数为:
l(w)=N∑i=1[yilogπw(xi)+(1−yi)log(1−πw(xi))]\mathcal{l}(w) = \sum_{i=1}^{N}\left[y_{i} \log \pi_{w}\left(x_{i}\right)+\left(1-y_{i}\right) \log \left(1-\pi_w\left(x_{i}\right)\right)\right]
这里
πw(x)=p(y=1∣x;w)=11+exp(−wTx)\begin{aligned}
\pi_w(x) = p(y=1 \mid x; w) =\frac{1}{1+\exp \left(-w^{T} x\right)}
\end{aligned}
写成这个形式就已经可以用诸如Pytorch这类工具来进行自动求导然后采用梯度下降法求解了。不过若需要用表达式直接计算出梯度,我们还需要将损失函数继续化简为:
l(w)=−N∑i=1(yiwTxi−log(1+exp(wTxi)))\mathcal{l}(w) = -\sum_{i=1}^N(y_i w^T x_i - \log(1 + \exp(w^T x_i)))
可将梯度表示如下:
∇wl(w)=−N∑i=1(yi−1exp(−wTx)+1)xi\nabla_w{\mathcal{l}(w)} = -\sum_{i=1}^N(y_i - \frac{1}{\exp(-w^Tx)+1})x_i
2. 基于Spark的并行化实现
逻辑回归的目标函数常采用梯度下降法求解,该算法的并行化可以采用如下的Map-Reduce架构:
先将第tt轮迭代的权重广播到各worker,各worker计算一个局部梯度(map过程),然后再将每个节点的梯度聚合(reduce过程),最终对参数进行更新。
在Spark中每个task对应一个分区,决定了计算的并行度(分区的概念详间我们上一篇博客《Spark: 单词计数(Word Count)的MapReduce实现(Java/Python)》 )。在Spark的实现过程如下:
- map阶段: 各task运行
map()
函数对每个样本(xi,yi)(x_i, y_i)计算梯度gig_i, 然后对每个样本对应的梯度运行进行本地聚合,以减少后面的数据传输量。如第1个task执行MARKDOWN_HASH4dddf55964835d3f9e941a93d1f83e93MARKDOWNHASH
操作得到˜g1=∑3i=1gi\widetilde{g}_1 = \sum\{i=1}^3 g_i 如下图所示: - reduce阶段:使用
reduce()
将所有task的计算结果收集到Driver端进行聚合,然后进行参数更新。
在上图中,训练数据用points:PrallelCollectionRDD
来表示,参数向量用ww来表示,注意参数向量不是RDD,只是一个单独的参与运算的变量。
此外需要注意一点,虽然每个task在本地进行了局部聚合,但如果task过多且每个task本地聚合后的结果(单个gradient)过大那么统一传递到Driver端仍然会造成单点的网络平均等问题。为了解决这个问题,Spark设计了性能更好的treeAggregate()
操作,使用树形聚合方法来减少网络和计算延迟。
3. PySpark实现代码
PySpark的完整实现代码如下:
from sklearn.datasets import load_breast_cancer
import numpy as np
from pyspark.sql import SparkSession
from operator import add
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
n_slices = 3 # Number of Slices
n_iterations = 300 # Number of iterations
alpha = 0.01 # iteration step\_size
def logistic\_f(x, w):
return 1 / (np.exp(-x.dot(w)) + 1)
def gradient(point: np.ndarray, w: np.ndarray) -> np.ndarray:
""" Compute linear regression gradient for a matrix of data points
"""
y = point[-1] # point label
x = point[:-1] # point coordinate
# For each point (x, y), compute gradient function, then sum these up
return - (y - logistic_f(x, w)) * x
if __name__ == "\_\_main\_\_":
X, y = load_breast_cancer(return_X_y=True)
D = X.shape[1]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=0)
n_train, n_test = X_train.shape[0], X_test.shape[0]
spark = SparkSession\
.builder\
.appName("Logistic Regression")\
.getOrCreate()
matrix = np.concatenate(
[X_train, np.ones((n_train, 1)), y_train.reshape(-1, 1)], axis=1)
points = spark.sparkContext.parallelize(matrix, n_slices).cache()
# Initialize w to a random value
w = 2 * np.random.ranf(size=D + 1) - 1
print("Initial w: " + str(w))
for t in range(n_iterations):
print("On iteration %d" % (t + 1))
g = points.map(lambda point: gradient(point, w)).reduce(add)
w -= alpha * g
y_pred = logistic_f(np.concatenate(
[X_test, np.ones((n_test, 1))], axis=1), w)
pred_label = np.where(y_pred < 0.5, 0, 1)
acc = accuracy_score(y_test, pred_label)
print("iterations: %d, accuracy: %f" % (t, acc))
print("Final w: %s " % w)
print("Final acc: %f" % acc)
spark.stop()
注意spark.sparkContext.parallelize(matrix, n_slices)
中的n_slices
就是Spark中的分区数。我们在代码中采用breast cancer数据集进行训练和测试,该数据集是个二分类数据集。模型初始权重采用随机初始化。
最后,我们来看一下算法的输出结果。
初始权重如下:
Initial w: [-0.0575882 0.79680833 0.96928013 0.98983501 -0.59487909 -0.23279241
-0.34157571 0.93084048 -0.10126002 0.19124314 0.7163746 -0.49597826
-0.50197367 0.81784642 0.96319482 0.06248513 -0.46138666 0.76500396
0.30422518 -0.21588114 -0.90260279 -0.07102884 -0.98577817 -0.09454256
0.07157487 0.9879555 0.36608845 -0.9740067 0.69620032 -0.97704433
-0.30932467]
最终的模型权重与在测试集上的准确率结果如下:
Final w: [ 8.22414803e+02 1.48384087e+03 4.97062125e+03 4.47845441e+03
7.71390166e+00 1.21510016e+00 -7.67338147e+00 -2.54147183e+00
1.55496346e+01 6.52930570e+00 2.02480712e+00 1.09860082e+02
-8.82480263e+00 -2.32991671e+03 1.61742379e+00 8.57741145e-01
1.30270454e-01 1.16399854e+00 2.09101988e+00 5.30845885e-02
8.28547658e+02 1.90597805e+03 4.93391021e+03 -4.69112527e+03
1.10030574e+01 1.49957834e+00 -1.02290791e+01 -3.11020744e+00
2.37012097e+01 5.97116694e+00 1.03680530e+02]
Final acc: 0.923977
可见我们的算法收敛良好。
参考
- [1] GiHub: Spark官方Python样例
- [2] 王树森-并行计算与机器学习(1/3)
- [3] 刘铁岩,陈薇等. 分布式机器学习:算法、理论与时间[M]. 机械工业出版社, 2018.
-
[4] 许利杰,方亚芬. 大数据处理框架Apache Spark设计与实现[M]. 电子工业出版社, 2021.
- 2. 基于Spark的并行化实现
- 3. PySpark实现代码
-
__EOF__
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-HzYGUgf3-1653713624502)(https://blog.csdn.net/orion-orion)]猎户座 - 本文链接: https://blog.csdn.net/orion-orion/p/16318810.html
- 关于博主: 本科CS系蒟蒻,机器学习半吊子,并行计算混子。
- 版权声明: 欢迎您对我的文章进行转载,但请务必保留原始出处哦(^▽^)。
- 声援博主: 如果您觉得文章对您有帮助,可以点击文章右下角【推荐】一下。
转载请注明:xuhss » 分布式机器学习:逻辑回归的并行化实现(PySpark)