前段时间学习了并行与分布式技术,为了写了篇关于KMeans算法的并行和分布式的编程写法,上网找了挺久,没想到网上并没有很多资料,那今天就来说一下我是怎么写的吧。
首先来讲一下K-Means的思想原理吧!
K-Means算法思想原理
K-means算法是根据给定
n
n
n个对象的数据集,构建
K
K
K个划分聚类方法,每个划分聚类称为簇。在这K个簇中,每个簇至少有一个数据对象,且每个数据对象有且只可属于一个簇。簇中的数据还有一个必须遵守的法则:同一个簇内的数据对象相似度高,不同簇的数据对象相似度低。这里的相似度将采用距离来衡量。
根据下图,可以看到K-means算法包括初始中心点的选择,对剩余数据对象遍历,进行相似度(距离)的计算,将相似度最高的数据点划分至该簇,重新计算中心点,再次相似度计算,直至代价函数达到最小值,即数据中心不再移动为止。 ![kmeans图片](https://img-blog.csdnimg.cn/fb2d32b06242458c95fca75edace0a4f.png#pic_center)
算法步骤总结
step1: 随机初始化
K
K
K个聚类中心点,循环Z次,
ε
=
1
0
−
5
\varepsilon=10^{-5}
ε=10−5。
step2: 遍历其余样本点计算与各中心点的距离,选择相似度最高的聚为一类。
\step3: 计算代价函数
J
J
J,若
∣
J
∣
≤
ε
|J|\leq\varepsilon
∣J∣≤ε或所有的观测值不再被分配或
k
k
k大于指定循环次数则退出循环,
k
=
k
+
1
k = k + 1
k=k+1。
step4: 重新计算新聚类中心点,返回step2。
接下来就是代码实现啦!
首先来看看只是用Numpy是怎么写的吧!
使用Numpy实现
为了方便理解,附上流程图如下: Numpy的实现其实就是把所有数据丢进一个矩阵,然后算算算就好了😀
import numpy as np
import pandas as pd
# 找出最优簇选择(初始)
def initial_value(n, k):
minJ = np.min(data, axis=0)
maxJ = np.max(data, axis=0)
rangeJ = maxJ - minJ
centroids = minJ + rangeJ * np.random.rand(k, 1)
return centroids
# 算每个的距离,,取最小距离的索引
def distance(data, centroids):
d = np.sqrt(((data-centroids[:, np.newaxis])**2).sum(axis=2))
return np.argmin(d, axis=0)
# 更新簇中心
def update(res, K):
return np.array([data[res==k].mean(axis=0) for k in range(K)])
def main(k, data, iters):
m, n = np.shape(data)
centroids = initial_value(n, k) # 初始点
for i in range(iters):
res = distance(data, centroids) # 最小距离索引
new_centroids = update(res, k) # 更新簇
if (new_centroids == centroids).all(): # 若更新前后中心相同,跳出循环
break
centroids = new_centroids
return res
if __name__ == '__main__':
data = pd.read_csv('data.csv').values
k, iters = 2, 100
result = main(k, data, iters)
print(result)
这种方式在数据量不大的时候其实还是很好用的,但如果数据量特大,和并行和分布式比起来就不是很占优势了。
多进程并行实现
老规矩,附上流程图: 在并行中,你会发现欸,怎么这流程不太一样了。。。 其实为了加快计算速度,我把数据分为8份丢到8个矩阵中同时计算,是不是听着就觉得快很多了呢!然后根据计算结果分别求平均值,最后找到新的中心点,循环到满意就ok啦。 至于我为什么是分8个矩阵,进程分配主要看自己的电脑,我的电脑是8核的。
import numpy as np
import pandas as pd
import multiprocessing
# 找出最优簇选择(初始)
def initial_value(n, k):
minJ = np.min(data, axis=0)
maxJ = np.max(data, axis=0)
rangeJ = maxJ - minJ
centroids = minJ + rangeJ * np.random.rand(k, 1)
return centroids
# 算每个的距离,,取最小距离的索引
def distance(data, centroids):
d = np.sqrt(((data-centroids[:, np.newaxis])**2).sum(axis=2))
return np.argmin(d, axis=0)
# 每个进程平均值
def avg(K, centroids, data):
res = distance(data, centroids)
return np.array([data[res==k].mean(axis=0) for k in range(K)]), res
def job(z):
return avg(z[0], z[1], z[2])
def main(k, data, iters):
m, n = np.shape(data)
centroids = initial_value(n, k) # 初始点
pool = multiprocessing.Pool(processes=8)
data_lst = [(k, centroids, data)]
for i in range(iters):
update = pool.map(job, data_lst) # 求得每个进程平均
part_centroids, res = update[0][0], update[0][1]
new_centroids = part_centroids.mean(axis=0)
if (centroids == new_centroids).all(): # 若更新前后中心相同,跳出循环
break
centroids = new_centroids
return res
if __name__ == '__main__':
data = pd.read_csv('data.csv').values
k, iters = 2, 100
result = main(k, data, iters)
print(result)
Dask分布式实现
嗯,上图! 其实并行和分布式感觉上的流程是差不多的,浅显的理解一下,分布式就是利用电脑上的多个部件疯狂肝,分布式是通过局域网在多台电脑上疯狂肝。
import numpy as np
import dask.array as da
import dask.dataframe as dd
import pandas as pd
from dask.distributed import Client
# 找出最优簇选择(初始)
def initial_value(n, k):
minJ = da.min(data, axis=0)
maxJ = da.max(data, axis=0)
rangeJ = maxJ - minJ
centroids = minJ + rangeJ * np.random.rand(k, 1)
return centroids.compute()
# 算每个的距离,,取最小距离的索引
def distance(data, centroids):
d = da.sqrt(((data-centroids[:, np.newaxis])**2).sum(axis=2))
return np.argmin(d, axis=0)
# 每个进程平均值
def avg(K, centroids, data):
res = distance(data, centroids)
return da.array([data[res==k].mean(axis=0) for k in range(K)])
def job(z):
return avg(z[0], z[1], z[2])
def main(k, data, iters):
m, n = da.shape(data)
centroids = initial_value(n, k) # 初始点
data_lst = [(k, centroids, data)]
for i in range(iters):
update = client.map(job, data_lst) # 求得每个进程平均
new_centroids_sum = client.submit(sum, update)
new_centroids = new_centroids_sum.result()/len(update)
if (centroids == new_centroids.compute()).all(): # 若更新前后中心相同,跳出循环
break
centroids = new_centroids
res = distance(data, centroids)
return res.compute()
if __name__ == '__main__':
client = Client('192.168.0.106:8786')
data = pd.read_csv('data.csv').values
k, iters = 2, 100
result = main(k, data, iters)
print(result)
结果对比
从下表中可以看到,速度从快到慢为:并行>Numpy>分布式>for循环。
方法耗时
(
s
)
(s)
(s)for循环5296.72Numpy383.95并行36.26分布式450.92
是不是很震惊,并行和分布式的速度居然差这么多!在我的理解上,分布式的速度快慢更多的是与网络好坏有关,可能小编的网真的很差吧😔!
这次的分享就到这啦,有什么意见或者建议可以在评论区里说说哟!
|