wordcount源码_word accounts

hacker|
101

文章目录:

系统中实现参数的配置读写,用什么方式好

WordCountHbaseReaderMapper类继承了TableMapper Text,Text抽象类,TableMapper类专门用于完成MapReduce中Map过程与Hbase表之间的操作。此时的map(ImmutableBytesWritable key,Result value,Context context)方法,第一个参数key为Hbase表的rowkey主键,第二个参数value为key主键对应的记录集合,此处的map核心实现是遍历key主键对应的记录集合value,将其组合成一条记录通过contentx.write(key,value)填充到 key,value键值对中。 详细源码请参考:WordCountHbaseReader\src\com\zonesion\hbase\WordCountHbaseReader.java public static class WordCountHbaseReaderMapper extends TableMapperText,Text{@Overrideprotected void map(ImmutableBytesWritable key,Result value,Context context)throws IOException, InterruptedException {StringBuffer sb = new StringBuffer("");for(Entrybyte[],byte[] entry:value.getFamilyMap("content".getBytes()).entrySet()){String str = new String(entry.getValue());//将字节数组转换为String类型if(str != null){sb.append(new String(entry.getKey()));sb.append(":");sb.append(str);}context.write(new Text(key.get()), new Text(new String(sb)));}} }3、 Reducer函数实现 此处的WordCountHbaseReaderReduce实现了直接输出Map输出的 key,value键值对,没有对其做任何处理。详细源码请参考:WordCountHbaseReader\src\com\zonesion\hbase\WordCountHbaseReader.java public static class WordCountHbaseReaderReduce extends ReducerText,Text,Text,Text{private Text result = new Text();@Overrideprotected void reduce(Text key, IterableText values,Context context)throws IOException, InterruptedException {for(Text val:values){result.set(val);context.write(key, result);}} }4、 驱动函数实现 与WordCount的驱动类不同,在Job配置的时候没有配置job.setMapperClass(),而是用以下方法执行Mapper类: TableMapReduceUtil.initTableMapperJob(tablename,scan,WordCountHbaseReaderMapper.class, Text.class, Text.class, job); 该方法指明了在执行job的Map过程时,数据输入源是hbase的tablename表,通过扫描读入对象scan对表进行全表扫描,为Map过程提供数据源输入,通过WordCountHbaseReaderMapper.class执行Map过程,Map过程的输出key/value类型是 Text.class与Text.class,最后一个参数是作业对象。特别注意:这里声明的是一个最简单的扫描读入对象scan,进行表扫描读取数据,其中scan可以配置参数,这里为了例子简单不再详述,用户可自行尝试。 详细源码请参考:WordCountHbaseReader\src\com\zonesion\hbase\WordCountHbaseReader.java public static void main(String[] args) throws Exception {String tablename = "wordcount";Configuration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum", "Master");String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 1) {System.err.println("Usage: WordCountHbaseReader out");System.exit(2);}Job job = new Job(conf, "WordCountHbaseReader");job.setJarByClass(WordCountHbaseReader.class);//设置任务数据的输出路径;FileOutputFormat.setOutputPath(job, new Path(otherArgs[0]));job.setReducerClass(WordCountHbaseReaderReduce.class);Scan scan = new Scan();TableMapReduceUtil.initTableMapperJob(tablename,scan,WordCountHbaseReaderMapper.class, Text.class, Text.class, job);//调用job.waitForCompletion(true) 执行任务,执行成功后退出;System.exit(job.waitForCompletion(true) ? 0 : 1);}5、部署运行 1)启动Hadoop集群和Hbase服务 [hadoop@K-Master ~]$ start-dfs.sh #启动hadoop HDFS文件管理系统 [hadoop@K-Master ~]$ start-mapred.sh #启动hadoop MapReduce分布式计算服务 [hadoop@K-Master ~]$ start-hbase.sh #启动Hbase [hadoop@K-Master ~]$ jps #查看进程 22003 HMaster 10611 SecondaryNameNode 22226 Jps 21938 HQuorumPeer 10709 JobTracker 22154 HRegionServer 20277 Main 10432 NameNode

大家对spark的源码了解多少,sparkshuffle,调度,sparkstreaming的源码?

流(Streaming),在大数据时代为数据流处理,就像水流一样,是数据流;既然是数据流处理,就会想到数据的流入、数据的加工、数据的流出。

日常工作、生活中数据来源很多不同的地方。例如:工业时代的汽车制造、监控设备、工业设备会产生很多源数据;信息时代的电商网站、日志服务器、社交网络、金融交易系统、黑客攻击、垃圾邮件、交通监控等;通信时代的手机、平板、智能设备、物联网等会产生很多实时数据,数据流无处不在。

在大数据时代Spark Streaming能做什么?

平时用户都有网上购物的经历,用户在网站上进行的各种操作通过Spark Streaming流处理技术可以被监控,用户的购买爱好、关注度、交易等可以进行行为分析。在金融领域,通过Spark Streaming流处理技术可以对交易量很大的账号进行监控,防止罪犯洗钱、财产转移、防欺诈等。在网络安全性方面,黑客攻击时有发生,通过Spark Streaming流处理技术可以将某类可疑IP进行监控并结合机器学习训练模型匹配出当前请求是否属于黑客攻击。其他方面,如:垃圾邮件监控过滤、交通监控、网络监控、工业设备监控的背后都是Spark Streaming发挥强大流处理的地方。

大数据时代,数据价值一般怎么定义?

所有没经过流处理的数据都是无效数据或没有价值的数据;数据产生之后立即处理产生的价值是最大的,数据放置越久或越滞后其使用价值越低。以前绝大多数电商网站盈利走的是网络流量(即用户的访问量),如今,电商网站不仅仅需要关注流量、交易量,更重要的是要通过数据流技术让电商网站的各种数据流动起来,通过实时流动的数据及时分析、挖掘出各种有价值的数据;比如:对不同交易量的用户指定用户画像,从而提供不同服务质量;准对用户访问电商网站板块爱好及时推荐相关的信息。

SparkStreaming VS Hadoop MR:

Spark Streaming是一个准实时流处理框架,而Hadoop MR是一个离线、批处理框架;很显然,在数据的价值性角度,Spark Streaming完胜于Hadoop MR。

SparkStreaming VS Storm:

Spark Streaming是一个准实时流处理框架,处理响应时间一般以分钟为单位,也就是说处理实时数据的延迟时间是秒级别的;Storm是一个实时流处理框架,处理响应是毫秒级的。所以在流框架选型方面要看具体业务场景。需要澄清的是现在很多人认为Spark Streaming流处理运行不稳定、数据丢失、事务性支持不好等等,那是因为很多人不会驾驭Spark Streaming及Spark本身。在Spark Streaming流处理的延迟时间方面,Spark定制版本,会将Spark Streaming的延迟从秒级别推进到100毫秒之内甚至更少。

SparkStreaming优点:

1、提供了丰富的API,企业中能快速实现各种复杂的业务逻辑。

2、流入Spark Streaming的数据流通过和机器学习算法结合,完成机器模拟和图计算。

3、Spark Streaming基于Spark优秀的血统。

SparkStreaming能不能像Storm一样,一条一条处理数据?

Storm处理数据的方式是以条为单位来一条一条处理的,而Spark Streaming基于单位时间处理数据的,SparkStreaming能不能像Storm一样呢?答案是:可以的。

业界一般的做法是Spark Streaming和Kafka搭档即可达到这种效果,入下图:

Kafka业界认同最主流的分布式消息框架,此框架即符合消息广播模式又符合消息队列模式。

Kafka内部使用的技术:

1、  Cache

2、  Interface

3、  Persistence(默认最大持久化一周)

4、  Zero-Copy技术让Kafka每秒吞吐量几百兆,而且数据只需要加载一次到内核提供其他应用程序使用

外部各种源数据推进(Push)Kafka,然后再通过Spark Streaming抓取(Pull)数据,抓取的数据量可以根据自己的实际情况确定每一秒中要处理多少数据。

通过Spark Streaming动手实战wordCount实例

这里是运行一个Spark Streaming的程序:统计这个时间段内流进来的单词出现的次数. 它计算的是:他规定的时间段内每个单词出现了多少次。

1、先启动下Spark集群:

我们从集群里面打开下官方网站

接受这个数据进行加工,就是流处理的过程,刚才那个WordCount就是以1s做一个单位。

刚才运行的时候,为什么没有结果呢?因为需要数据源。

2、获取数据源:

新开一个命令终端,然后输入:

$ nc -lk 9999

现在我们拷贝数据源进入运行:

然后按回车运行

DStream和RDD关系:

没有输入数据会打印的是空结果:

但是实际上,Job的执行是Spark Streaming框架帮我们产生的和开发者自己写的Spark代码业务逻辑没有关系,而且Spark Streaming框架的执行时间间隔可以手动配置,如:每隔一秒钟就会产生一次Job的调用。所以在开发者编写好的Spark代码时(如:flatmap、map、collect),不会导致job的运行,job运行是Spark Streaming框架产生的,可以配置成每隔一秒中都会产生一次job调用。

Spark Streaming流进来的数据是DStream,但Spark Core框架只认RDD,这就产生矛盾了?

Spark Streaming框架中,作业实例的产生都是基于rdd实例来产生,你写的代码是作业的模板,即rdd是作业的模板,模板一运行rdd就会被执行,此时action必须处理数据。RDD的模板就是DStream离散流,RDD之间存在依赖关系,DStream就有了依赖关系,也就构成了DStream 有向无环图。这个DAG图,是模板。Spark Streaming只不过是在附在RDD上面一层薄薄的封装而已。你写的代码不能产生Job,只有框架才能产生Job.

如果一秒内计算不完数据,就只能调优了.

总结:

使用Spark Streaming可以处理各种数据来源类型,如:数据库、HDFS,服务器log日志、网络流,其强大超越了你想象不到的场景,只是很多时候大家不会用,其真正原因是对Spark、spark streaming本身不了解。

如何在MaxCompute上运行HadoopMR作业

MaxCompute(原ODPS)有一套自己的MapReduce编程模型和接口,简单说来,这套接口的输入输出都是MaxCompute中的Table,处理的数据是以Record为组织形式的,它可以很好地描述Table中的数据处理过程,然而与社区的Hadoop相比,编程接口差异较大。Hadoop用户如果要将原来的Hadoop MR作业迁移到MaxCompute的MR执行,需要重写MR的代码,使用MaxCompute的接口进行编译和调试,运行正常后再打成一个Jar包才能放到MaxCompute的平台来运行。这个过程十分繁琐,需要耗费很多的开发和测试人力。如果能够完全不改或者少量地修改原来的Hadoop MR代码就能在MaxCompute平台上跑起来,将是一个比较理想的方式。

现在MaxCompute平台提供了一个HadoopMR到MaxCompute MR的适配工具,已经在一定程度上实现了Hadoop MR作业的二进制级别的兼容,即用户可以在不改代码的情况下通过指定一些配置,就能将原来在Hadoop上运行的MR jar包拿过来直接跑在MaxCompute上。目前该插件处于测试阶段,暂时还不能支持用户自定义comparator和自定义key类型,下面将以WordCount程序为例,介绍一下这个插件的基本使用方式。

使用该插件在MaxCompute平台跑一个HadoopMR作业的基本步骤如下:

1. 下载HadoopMR的插件

下载插件,包名为hadoop2openmr-1.0.jar,注意,这个jar里面已经包含hadoop-2.7.2版本的相关依赖,在作业的jar包中请不要携带hadoop的依赖,避免版本冲突。

2. 准备好HadoopMR的程序jar包

编译导出WordCount的jar包:wordcount_test.jar ,wordcount程序的源码如下:

package com.aliyun.odps.mapred.example.hadoop;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

import java.util.StringTokenizer;

public class WordCount {

public static class TokenizerMapper

extends MapperObject, Text, Text, IntWritable{

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

public void map(Object key, Text value, Context context

) throws IOException, InterruptedException {

StringTokenizer itr = new StringTokenizer(value.toString());

while (itr.hasMoreTokens()) {

word.set(itr.nextToken());

context.write(word, one);

}

}

}

public static class IntSumReducer

extends ReducerText,IntWritable,Text,IntWritable {

private IntWritable result = new IntWritable();

public void reduce(Text key, IterableIntWritable values,

Context context

) throws IOException, InterruptedException {

int sum = 0;

for (IntWritable val : values) {

sum += val.get();

}

result.set(sum);

context.write(key, result);

}

}

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

Job job = Job.getInstance(conf, "word count");

job.setJarByClass(WordCount.class);

job.setMapperClass(TokenizerMapper.class);

job.setCombinerClass(IntSumReducer.class);

job.setReducerClass(IntSumReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

3. 测试数据准备

创建输入表和输出表

create table if not exists wc_in(line string);

create table if not exists wc_out(key string, cnt bigint);

通过tunnel将数据导入输入表中

待导入文本文件data.txt的数据内容如下:

hello maxcompute

hello mapreduce

例如可以通过如下命令将data.txt的数据导入wc_in中,

tunnel upload data.txt wc_in;

4. 准备好表与hdfs文件路径的映射关系配置

配置文件命名为:wordcount-table-res.conf

{

"file:/foo": {

"resolver": {

"resolver": "c.TextFileResolver",

"properties": {

"text.resolver.columns.combine.enable": "true",

"text.resolver.seperator": "\t"

}

},

"tableInfos": [

{

"tblName": "wc_in",

"partSpec": {},

"label": "__default__"

}

],

"matchMode": "exact"

},

"file:/bar": {

"resolver": {

"resolver": "openmr.resolver.BinaryFileResolver",

"properties": {

"binary.resolver.input.key.class" : "org.apache.hadoop.io.Text",

"binary.resolver.input.value.class" : "org.apache.hadoop.io.LongWritable"

}

},

"tableInfos": [

{

"tblName": "wc_out",

"partSpec": {},

"label": "__default__"

}

],

"matchMode": "fuzzy"

}

}

hadoop hdfs 源码怎么看

在使用Hadoop的过程中,很容易通过FileSystem类的API来读取HDFS中的文件内容,读取内容的过程是怎样的呢?今天来分析客户端读取HDFS文件的过程,下面的一个小程序完成的功能是读取HDFS中某个目录下的文件内容,然后输出到控制台,代码如下:

[java] view plain copy

public class LoadDataFromHDFS {

public static void main(String[] args) throws IOException {

new LoadDataFromHDFS().loadFromHdfs("hdfs://localhost:9000/user/wordcount/");

}

public void loadFromHdfs(String hdfsPath) throws IOException {

Configuration conf = new Configuration();

Path hdfs = new Path(hdfsPath);

FileSystem in = FileSystem.get(conf);

//in = FileSystem.get(URI.create(hdfsPath), conf);//这两行都会创建一个DistributedFileSystem对象

FileStatus[] status = in.listStatus(hdfs);

for(int i = 0; i status.length; i++) {

byte[] buff = new byte[1024];

FSDataInputStream inputStream = in.open(status[i].getPath());

while(inputStream.read(buff) 0) {

System.out.print(new String(buff));

}

inputStream.close();

}

}

}

FileSystem in = FileSystem.get(conf)这行代码创建一个DistributedFileSystem,如果直接传入一个Configuration类型的参数,那么默认会读取属性fs.default.name的值,根据这个属性的值创建对应的FileSystem子类对象,如果没有配置fs.default.name属性的值,那么默认创建一个org.apache.hadoop.fs.LocalFileSystem类型的对象。但是这里是要读取HDFS中的文件,所以在core-site.xml文件中配置fs.default.name属性的值为hdfs://localhost:9000,这样FileSystem.get(conf)返回的才是一个DistributedFileSystem类的对象。 还有一种创建DistributedFileSystem这种指定文件系统类型对像的方法是使用FileSystem.get(Configuration conf)的一个重载方法FileSystem.get(URI uri, Configuration),其实调用第一个方法时在FileSystem类中先读取conf中的属性fs.default.name的值,再调用的FileSystem.get(URI uri, Configuration)方法。

我在CentOS系统中配置hadoopp,在eclipse中运行hadoopp的wordcount.java源代码

新建一个hadoop工程,如图

建一个运行wordcount的类,先不管他什么意思,代码如下

[java] view plain copy

/**

* Project: hadoop

*

* File Created at 2012-5-21

* $Id$

*/

package seee.you.app;

import java.io.IOException;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

public static class TokenizerMapper extends MapperLongWritable, Text, Text, IntWritable{

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

public void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

StringTokenizer itr = new StringTokenizer(value.toString());

while (itr.hasMoreTokens()) {

word.set(itr.nextToken());

context.write(word, one);

}

}

}

public static class IntSumReducer extends ReducerText, IntWritable, Text, IntWritable {

private IntWritable result = new IntWritable();

public void reduce(Text key, IterableIntWritable values, Context context)

throws IOException, InterruptedException {

int sum = 0;

for (IntWritable val : values) {

sum += val.get();

}

result.set(sum);

context.write(key, result);

}

}

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

if (args.length != 2) {

System.err.println("Usage: wordcount ");

System.exit(2);

}

Job job = new Job(conf, "word count");

job.setJarByClass(WordCount.class);

job.setMapperClass(TokenizerMapper.class);

job.setReducerClass(IntSumReducer.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(IntWritable.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

这时候右键run on hadoop

这时候不幸的是,报错了,错误信息如下:

[java] view plain copy

12/05/23 19:38:51 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

12/05/23 19:38:51 ERROR security.UserGroupInformation: PriviledgedActionException as:yongkang.qiyk cause:java.io.IOException: Failed to set permissions of path: \tmp\hadoop-yongkang\mapred\staging\yongkang.qiyk-1840800210\.staging to 0700

Exception in thread "main" java.io.IOException: Failed to set permissions of path: \tmp\hadoop-yongkang\mapred\staging\yongkang.qiyk-1840800210\.staging to 0700

at org.apache.hadoop.fs.FileUtil.checkReturnValue(FileUtil.java:682)

at org.apache.hadoop.fs.FileUtil.setPermission(FileUtil.java:655)

at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:509)

at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:344)

at org.apache.hadoop.fs.FilterFileSystem.mkdirs(FilterFileSystem.java:189)

at org.apache.hadoop.mapreduce.JobSubmissionFiles.getStagingDir(JobSubmissionFiles.java:116)

at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:856)

at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:850)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:396)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1093)

at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:850)

at org.apache.hadoop.mapreduce.Job.submit(Job.java:500)

at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:530)

at seee.you.app.WordCount.main(WordCount.java:80)

错误信息很明显了,at org.apache.hadoop.fs.FileUtil.checkReturnValue(FileUtil.java:682) 这一行的方法报错了

网上查到这是由于0.20.203.0以后的版本的权限认证引起的,只有去掉才行

修改hadoop源代码,去除权限认证,修改FileUtil.java的checkReturnValue方法,如下:

[java] view plain copy

private static void checkReturnValue(boolean rv, File p,

FsPermission permission

) throws IOException {

// if (!rv) {

// throw new IOException("Failed to set permissions of path: " + p +

// " to " +

// String.format("%04o", permission.toShort()));

// }

}

去掉这一行后,需要重新编译打包下,打包成功之后,可以将hadoop-core-1.0.2.jar拷贝到hadoop根目录下,eclipse中重新导入下即可(我用的这个1.0.2是从网上下载的修改好的,比较省事)

这时重新运行下实例,运行实例需要配置下arguments参数,我的配置如下:

run一下,结果如下,说明已经成功了

[java] view plain copy

WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.

****hdfs://10.16.110.7:9000/user/yongkang/test-in

INFO input.FileInputFormat: Total input paths to process : 0

INFO mapred.JobClient: Running job: job_local_0001

INFO mapred.Task: Using ResourceCalculatorPlugin : null

INFO mapred.LocalJobRunner:

INFO mapred.Merger: Merging 0 sorted segments

INFO mapred.Merger: Down to the last merge-pass, with 0 segments left of total size: 0 bytes

INFO mapred.LocalJobRunner:

INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting

INFO mapred.LocalJobRunner:

INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now

INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to /user/yongkang/test-out6

INFO mapred.JobClient: map 0% reduce 0%

INFO mapred.LocalJobRunner: reduce reduce

INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done.

INFO mapred.JobClient: map 0% reduce 100%

INFO mapred.JobClient: Job complete: job_local_0001

INFO mapred.JobClient: Counters: 10

INFO mapred.JobClient: File Output Format Counters

INFO mapred.JobClient: Bytes Written=0

INFO mapred.JobClient: FileSystemCounters

INFO mapred.JobClient: FILE_BYTES_READ=8604

INFO mapred.JobClient: FILE_BYTES_WRITTEN=51882

INFO mapred.JobClient: Map-Reduce Framework

INFO mapred.JobClient: Reduce input groups=0

INFO mapred.JobClient: Combine output records=0

INFO mapred.JobClient: Reduce shuffle bytes=0

INFO mapred.JobClient: Reduce output records=0

INFO mapred.JobClient: Spilled Records=0

INFO mapred.JobClient: Total committed heap usage (bytes)=5177344

INFO mapred.JobClient: Reduce input records=0

2条大神的评论

  • avatar
    访客 2022-07-09 下午 07:45:34

    lass WordCountHbaseReaderReduce extends ReducerText,Text,Text,Text{private Text result = new Text();@Overrideprotected void reduce(Text key, It

  • avatar
    访客 2022-07-10 上午 12:11:20

    to process : 0 INFO mapred.JobClient: Running job: job_local_0001 INFO mapred.Task: Using ResourceCalculatorPlugin : n

发表评论