/** * 贝叶斯预测算法 * @author ZD */ public class BayesTest { private static class BayesTestMapper extends Mapper{ private Map fy = new HashMap(); //分类频度表 private Map fxy = new HashMap(); //属性频度表 @Override protected void setup(Mapper.Context context) throws IOException, InterruptedException { FileSystem fs = FileSystem.get(context.getConfiguration()); BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path("/Bayes/train/part-r-00000")))); String line = ""; while((line = br.readLine())!=null){ String[] strs = line.split("\t"); //处理的训练集数据,以\t分割 if(line.contains(":")){ fxy.put(strs[0], Integer.parseInt(strs[1])); }else{ fy.put(strs[0], Integer.parseInt(strs[1])); } } br.close(); } @Override protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException { String[] strs = value.toString().split(" "); //处理的预测文件数据,以空格分割 int max=-1; String finalType=""; for(String type:fy.keySet()){ int tempMax=1; int fyCount = fy.get(type); //strs.length代表属性个数 int fxyCount = 1; for(int i=1; imax){ max = tempMax; finalType = type; } } context.write(value, new Text(finalType)); } } private static class BayesTestReducer extends Reducer {
@Override protected void reduce(Text key, Iterable values, Reducer.Context context) throws IOException, InterruptedException { for (Text value : values) { context.write(key, value); } } } public static void main(String[] args) { try { Configuration cfg = HadoopCfg.getConfigration(); Job job = Job.getInstance(cfg); job.setJobName("BayesTest"); job.setJarByClass(BayesTest.class); job.setMapperClass(BayesTestMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(BayesTestReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path("/input/bayes/test/")); FileOutputFormat.setOutputPath(job, new Path("/Bayes/test/")); System.exit(job.waitForCompletion(true) ? 0 : 1); } catch (Exception e) { e.printStackTrace(); } } }
|