最近一直在学习hadoop 这是一个简单的关于MapReduce的示例
通过实现map和reduce2个函数完成操作
首先定义一个自定义对象
class MyWriterble implements Writable{
long UpPackNum;
long DownPackNum;
long UpPayLoad;
long DownPayLoad;
public MyWriterble(){}
public MyWriterble(String UpPackNum,String DownPackNum,String UpPayLoad,String DownPayLoad){
this.UpPackNum=Long.parseLong(UpPackNum);
this.DownPackNum=Long.parseLong(DownPackNum);
this.UpPayLoad=Long.parseLong(UpPayLoad);
this.DownPayLoad=Long.parseLong(DownPayLoad);
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
this.UpPackNum=in.readLong();
this.DownPackNum=in.readLong();
this.UpPayLoad=in.readLong();
this.DownPayLoad=in.readLong();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(UpPackNum);
out.writeLong(DownPackNum);
out.writeLong(UpPayLoad);
out.writeLong(DownPayLoad);
}
@Override
public String toString() {
// TODO Auto-generated method stub
return UpPackNum + "\t" + DownPackNum + "\t" + UpPayLoad + "\t" + DownPayLoad;
}
}
其中类必须实现Writable接口 和Java中的可序列化接口一样
然后分别定义2个类 用于重写map和reduce方法
class MyMappera extends Mapper<LongWritable,Text, Text,MyWriterble>{
protected void map(LongWritable key, Text value,Context context) throws IOException ,InterruptedException {
String vals[]=value.toString().split("\t");
MyWriterble my=new MyWriterble(vals[21], vals[22], vals[23], vals[24]);
context.write(new Text(vals[2]),my);
};
}
Mapper中的泛型 分别代表k1,v1,k2,v2类型
k1,v1即使分割每一行数据得到的键值对,key为位置,value为行内容
k2,v2为输出值得类型,k2表示每个手机号,所以为Text
class myreducers extends Reducer<Text,MyWriterble,Text,MyWriterble>{
protected void reduce(Text text, java.lang.Iterable<MyWriterble> wt,Context context) throws IOException ,InterruptedException {
long UpPackNum=0l;
long DownPackNum=0l;
long UpPayLoad=0l;
long DownPayLoad=0l;
for(MyWriterble mt:wt){
UpPackNum=UpPackNum+mt.UpPackNum;
DownPackNum=DownPackNum+mt.DownPackNum;
UpPayLoad=UpPayLoad+mt.UpPayLoad;
DownPayLoad=DownPayLoad+DownPayLoad;
}
context.write(text, new MyWriterble(UpPackNum+"", DownPackNum+"", UpPayLoad+"", DownPayLoad+""));
};
public class MyRIZHI {
public static void main(String[] args) throws Exception{
final String INPUT_PATHs = "hdfs://chaoren:9000/ncmdp_08500001_Net_20130515164000.dat";
final String OUT_PATHs = "hdfs://chaoren:9000/out";
Job job=new Job(new Configuration(),MyRIZHI.class.getSimpleName());
//1.1 指定输入文件路径
FileInputFormat.setInputPaths(job, INPUT_PATHs);
//指定哪个类用来格式化输入文件
job.setInputFormatClass(TextInputFormat.class);
//1.2指定自定义的Mapper类
job.setMapperClass(MyMappera.class);
//指定输出<k2,v2>的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(MyWriterble.class);
//1.3 指定分区类
job.setPartitionerClass(HashPartitioner.class);
job.setNumReduceTasks(1);
//1.4 TODO 排序、分区
//1.5 TODO (可选)合并
//2.2 指定自定义的reduce类
job.setReducerClass(myreducers.class);
//指定输出<k3,v3>的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(MyWriterble.class);
//2.3 指定输出到哪里
FileOutputFormat.setOutputPath(job, new Path(OUT_PATHs));
//设定输出文件的格式化类
job.setOutputFormatClass(TextOutputFormat.class);
//把代码提交给JobTracker执行
job.waitForCompletion(true);
}
}
分享到:
相关推荐
Hadoop短视频流量-Hadoop短视频流量系统-Hadoop短视频流量系统源码-Hadoop短视频流量管理系统-Hadoop短视频流量管理系统java代码-Hadoop短视频流量系统设计与实现-基于springboot的Hadoop短视频流量系统-基于Web的...
Hadoop短视频流量-Hadoop短视频流量系统-Hadoop短视频流量系统源码-Hadoop短视频流量管理系统-Hadoop短视频流量管理系统java代码-Hadoop短视频流量系统设计与实现-基于springboot的Hadoop短视频流量系统-基于Web的...
基于Hadoop网站流量日志数据分析系统 1、典型的离线流数据分析系统 2、技术分析 - Hadoop - nginx - flume - hive - mysql - springboot + mybatisplus+vcharts nginx + lua 日志文件埋点的 基于Hadoop网站流量...
1、根据实际离线流量分析特点, 使用云计算技术设计基于的离线流量分析系统解决海量流量数据 的存储和分析难题。 2、为提高流量分析系 统可用 性 , 设计分布 式集群 的 管理 、 监控 、 告 警 和 优 化 系 统 , 以...
hadoop流量统计程序hadoop流量统计程序hadoop流量统计程序hadoop流量统计程序
一个hadoop的用户流量分析系统的原始数据资料,可用作测试,一般学习等。 博客地址:http://blog.csdn.net/sdksdk0
小例子是自己写的,目的是让自己熟悉一下如何在集群上运行一个mapreduce项目,大家可以参考此例子学习hadoop,对入门很有帮助。小例子是自己写的,目的是让自己熟悉一下如何在集群上运行一个mapreduce项目,大家可以...
该文件可以帮助我们练习Hadoop的统计功能。
基于Hadoop的网络流量分流并行化设计与实现
hadoop处理海小文件的一种改进方法的文章,可供参考。
关于常用的一个hadoop的python脚本代码,包括两种字典,一种是file分发的,一种是靠输入并且,一起经过shuffle排序,再进行计算的。限于文件大小限制,只有代码,不包含字典文件,所以不能直接运行,请见谅
第四章(Hadoop大数据处理实战)Hadoop分布式文件系统.pdf第四章(Hadoop大数据处理实战)Hadoop分布式文件系统.pdf第四章(Hadoop大数据处理实战)Hadoop分布式文件系统.pdf第四章(Hadoop大数据处理实战)Hadoop分布式文件...
基于Hadoop网站流量分析系统源码(课设项目).zip基于Hadoop网站流量分析系统源码(课设项目).zip基于Hadoop网站流量分析系统源码(课设项目).zip基于Hadoop网站流量分析系统源码(课设项目).zip基于Hadoop网站流量分析...
基于Hadoop网站流量日志数据分析系统项目源码+教程.zip网站流量日志数据分析系统 典型的离线流数据分析系统 技术分析 hadoop nginx flume hive sqoop mysql springboot+mybatisplus+vcharts 基于Hadoop网站流量日志...
云计算框架之一hadoop常用的算法例子
《Hadoop大数据处理》共10章涉及的主题包括大数据处理概论、基于Hadoop的大数据处理框架、MapReduce计算模式、使用HDFS存储大数据、HBase大数据库、大数据的分析处理、Hadoop环境下的数据整合、Hadoop集群的管理与...
eclipse hadoop例子源代码 eclipse hadoop例子源代码
hadoop 框架下 mapreduce源码例子 wordcount ,eclipse下,hadoop 2.2 可以运行