基于MapReduce实现的Kmeans算法(非调库)

您所在的位置:网站首页 我温柔又暴躁的妈妈作文怎么写 基于MapReduce实现的Kmeans算法(非调库)

基于MapReduce实现的Kmeans算法(非调库)

2023-07-29 00:40| 来源: 网络整理| 查看: 265

简单基于MapReduce实现了下KMeans。

算法思路

KMeans算法作为一种划分式的聚类算法,利用MapReduce进行实现的主要难点在于满足KMeans每次迭代划分过程的中间结果保存。 因此利用HDFS进行中心点的存储,以实现各节点间的数据共享。 基于MapReduce的KMeans算法流程如下:

随机分配簇,初始化中心点,存入HDFS。Mapper中读取数据文件中的每条数据并与中心点进行距离计算,输出key为最近的中心点序号。Reducer中进行归并,计算新的中心点,存入新的中心文件。判断停机条件,不满足则复制新的中心文件到原中心文件,重复2,3步骤。输出聚类结果,包括数据点信息与对应簇序号。 初始化中心点

利用Mapper读取每一个元素的向量信息,随机赋值,在Reducer中计算中心点信息。由于中心点的计算与迭代时的计算相同,与迭代计算共用一个reducer类。

CenterRandomMapper

随机赋值的mapper类。

protected void setup(Context context) throws IOException, InterruptedException { // 读取k值 Configuration configuration = context.getConfiguration();; k = configuration.getInt("cluster.k", 3); }

setup中读取配置的聚类簇数量。

protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { // 随机分配簇 int index = (int) (Math.random() * k); System.out.println(index); context.write(new Text(Integer.toString(index)), value); }

map方法中,根据聚类簇数量,对每个元素赋予随机的类簇序号,作为输出的key。value为元素向量,保持不变。

CenterRandomAdapter

初始化中心点的任务配置类,实现一个static方法。

public static void createRandomCenter(String dataPath, String centerPath, int k){ Configuration hadoopConfig = new Configuration(); hadoopConfig.setInt("cluster.k", k); try { Job job = Job.getInstance(hadoopConfig, "random center task"); job.setJarByClass(KmeansRun.class); job.setMapperClass(CenterRandomMapper.class); job.setReducerClass(KmeansReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 输出为新计算得到的center,已存在则删除 Path outPath = new Path(centerPath); outPath.getFileSystem(hadoopConfig).delete(outPath, true); //job执行作业时输入和输出文件的路径 FileInputFormat.addInputPath(job, new Path(dataPath)); FileOutputFormat.setOutputPath(job, new Path(centerPath)); //执行job,直到完成 job.waitForCompletion(true); System.out.println("random center task"); } }

该方法包含三个参数,分别为数据文件地址,中心点文件地址以及聚类数。首先在配置中设置聚类数,方便mapper中进行读取。 设置对应的mapper和reducer类以及输入输出格式,需要注意的是reducer使用了KmeansReducer类,即正式迭代时计算中心点的reducer。由于HDFS不能直接进行同名文件的覆盖,所以在每次生成新的中心点文件时,需要判断是否已经存在同名文件,存在则删除。

分配元素对应簇,计算中心点

利用Mapper将所有元素与中心点进行对比,分配到最近的簇中。利用Reducer进行求和并计算新的中心点信息。

KmeansMapper private ArrayList centers = null; @Override protected void setup(Context context) throws IOException, InterruptedException { // 读一下centers // 地址从配置中拿好了 Configuration configuration = context.getConfiguration(); String centerPath = configuration.get("cluster.center_path"); centers = DataUtil.readCenter(centerPath);} 在mapper执行前,利用setup方法进行中心点的读取。DataUtil为工具类,readCenter()实现对HDFS中center文件夹中所有文件信息的读取。 public static ArrayList readCenter(String centerPath) throws IOException { ArrayList centers = new ArrayList(); Path path = new Path(centerPath); Configuration conf = new Configuration(); FileSystem fileSystem = path.getFileSystem(conf); if(fileSystem.isDirectory(path)){ // 文件夹,遍历读取 FileStatus[] listFile = fileSystem.listStatus(path); for (FileStatus fileStatus : listFile) { LineReader lineReader = getLineReader(fileStatus.getPath().toString()); readCenterLines(lineReader, centers); } }else { // 普通文件,直接读取 LineReader lineReader = getLineReader(centerPath); readCenterLines(lineReader, centers); } return centers; }

判断地址属性,对文件夹进行遍历,利用LineReader进行所有文件信息的读取,最终返回二维double集合形式的中心点信息。

protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { ArrayList element = DataUtil.splitStringIntoArray(value.toString()); // 选择最近中心点,将其作为key int index = CalUtil.selectNearestCenter(element, centers); context.write(new Text(Integer.toString(index)), value); }

map方法中进行邻近中心点的选择,将其对应的序号作为key进行输出。CalUtil为计算工具类,selectNearestCenter()实现了最近中心点的获取。

public static int selectNearestCenter(ArrayList element, ArrayList centers){ double minDis = 100000; int nearstIndex = 0; for(int i=0;i minDis = dis; nearstIndex = i; } } return nearstIndex; }

遍历进行最小值的选取,本此实现中calDistance()实现欧式距离的计算。

KMeansReducer protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { ArrayList sumElement = new ArrayList(); int num = 0; // 遍历values相加,求新中心点 for(Text t:values){ num += 1; ArrayList element = DataUtil.splitStringIntoArray(t.toString()); if(sumElement.size() for(int i=0;i for(int i=0;i // 停机,不做修改 return true; }else{ // 覆盖原中心文件 System.out.println("distanceSum=" + distanceSum); DataUtil.changeCenters(centerPath, newCenterPath, new Configuration()); return false; }

主要是利用CalUtil.calDistanceBetweenCenters计算新旧两组中心点之间的距离差值,因为较难把控阈值信息,直接就等两组中心点完全相同时实现停机,返回true。

// 计算两次迭代的中心是否有变化,返回距离 public static double calDistanceBetweenCenters(ArrayListoldCenter, ArrayListnewCenter){ // 因为data的读入顺序相同,所以最终收敛时聚类中心的顺序也相同 // 只要遍历计算距离即可,不用考虑中心点本身顺序 double sum = 0; for(int i=0;i // 设置原中心点 Configuration hadoopConfig = new Configuration(); hadoopConfig.set("cluster.center_path", centerPath); try { Job job = Job.getInstance(hadoopConfig, "one round cluster task"); job.setJarByClass(KmeansRun.class); job.setMapperClass(KmeansMapper.class); job.setReducerClass(KmeansReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 输出为新计算得到的center,已存在则删除 Path outPath = new Path(newCenterPath); outPath.getFileSystem(hadoopConfig).delete(outPath, true); //job执行作业时输入和输出文件的路径 FileInputFormat.addInputPath(job, new Path(dataPath)); FileOutputFormat.setOutputPath(job, new Path(newCenterPath)); //执行job,直到完成 job.waitForCompletion(true); System.out.println("finish one round cluster task"); } catch (IOException | InterruptedException | ClassNotFoundException e) { e.printStackTrace(); } }

该方法拥有三个参数,分别为数据集地址,旧中心点文件地址与新中心点文件地址。设置任务mapper为KmeansMapper,reducer为KmeansReducer。在configuration中设置旧中心点文件地址,方便mapper读取。输入文件为数据集,输出文件地址设置为新中心点文件地址。

生成聚类结果

KmeansAdapter 该类中实现聚类结果输出任务设置方法createClusterResult()。 聚类结果即为KmeansMapper的输出结果,故只要调用mapper并输出结果即可。

public static void createClusterResult(String dataPath, String centerPath, String clusterResultPath){ // 设置原中心点 Configuration hadoopConfig = new Configuration(); hadoopConfig.set("cluster.center_path", centerPath); try { Job job = Job.getInstance(hadoopConfig, "cluster result task"); job.setJarByClass(KmeansRun.class); // 无reducer job.setMapperClass(KmeansMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 输出为新计算得到的center,已存在则删除 Path outPath = new Path(clusterResultPath); outPath.getFileSystem(hadoopConfig).delete(outPath, true); //job执行作业时输入和输出文件的路径 FileInputFormat.addInputPath(job, new Path(dataPath)); FileOutputFormat.setOutputPath(job, new Path(clusterResultPath)); //执行job,直到完成 job.waitForCompletion(true); System.out.println("cluster result task finished"); } catch (IOException | InterruptedException | ClassNotFoundException e) { e.printStackTrace(); } }

该方法包括三个参数,分别为数据集地址,中心点地址以及聚类结果输出地址。设置MapperClass即可。

总流程实现 KmeansRun

该类中实现需要调用的main方法。

public static void main(String[] args){ // 命令行参数为数据集名称与聚类数 String dataName = args[0]; int k = Integer.parseInt(args[1]); String centerPath = DataUtil.HDFS_OUTPUT + "/centers.txt"; String newCenterPath = DataUtil.HDFS_OUTPUT + "/new_centers.txt"; String dataPath = DataUtil.HDFS_INPUT + "/" + dataName; String clusterResultPath = DataUtil.HDFS_OUTPUT + "/kmeans_cluster_result.txt"; // 初始化随机中心点 CenterRandomAdapter.createRandomCenter(dataPath, centerPath, k); // 默认1000次,中途停退出 for(int i=0;i


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3