需求
将 kafka 上的数据实时同步到 HDFS,不能有太多小文件
实现过程
Spark Streaming 支持 RDD#saveAsTextFile,将数据以 纯文本 方式写到 HDFS,我们查看 RDD#saveAsTextFile 可以看到
1 2
| RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null) .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
|
从上面这句话我们可以知道,首先将 RDD 转化为 PariRDD,然后再调用 saveAsHadoopFile 函数进行实际的操作。上面的语句中 r
是原始 RDD,nullWritableClassTag
和 textClassTag
表示所写数据的类型,使用 nullWritableClassTag
是因为 HDFS 不会将这个数据进行实际写入(pariRDD 是 (K,V) 类型, 我们只需要写入 V),从效果上看就只写如后面的一个字段。TextOutputFormat
是一个格式化函数,后面我们再来看这个函数,NullWritable
则表示一个占位符,同样是这个字段不需要实际写入 HDFS,Text
表示我们将写入文本类型的数据。
我们看到 TextOutputFormat
这个类中有一个函数是 RecordWriter
用于操作没一条记录的写入,代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { boolean isCompressed = getCompressOutput(job); String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator", "\t"); if(!isCompressed) { Path codecClass1 = FileOutputFormat.getTaskOutputPath(job, name); FileSystem codec1 = codecClass1.getFileSystem(job); FSDataOutputStream file1 = codec1.create(codecClass1, progress); return new TextOutputFormat.LineRecordWriter(file1, keyValueSeparator); } else { Class codecClass = getOutputCompressorClass(job, GzipCodec.class); CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, job); Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension()); FileSystem fs = file.getFileSystem(job); FSDataOutputStream fileOut = fs.create(file, progress); return new TextOutputFormat.LineRecordWriter(new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator); } }
|
文件中分为两部分:1)压缩文件,2)非压缩文件。然后剩下的事情就是打开文件,往文件中写数据了。
说到压缩文件,就和写 lzo 格式关联起来了,因为 lzo 格式就是压缩的,那么我们从哪拿到这个压缩的格式的呢?实际上 PariRDDFunctions#saveAsHadoopFile 还可以传入压缩格式类,函数原型如下
1 2 3
| def saveAsHadoopFile[F <: OutputFormat[K, V]]( path: String, codec: Class[_ <: CompressionCodec])(implicit fm: ClassTag[F]): Unit = self.withScope {
|
这里第二个参数表示压缩的类。如果需要我们传入一个压缩类即可,如 classOf[com.hadoop.compression.lzo.LzopCodec]
最终这个参数会传给 TextOutputFormat#RecordWriter
.
至此,我们以及可以写 lzo 格式的文件了。但是还没有结束,因为会产生小文件,每个 RDD 的每个 partition 都会在 HDFS 上产生一个文件,而且这些文件大小非常小,就形成了很多小文件,这对 HDFS 的压力会非常大。我们需要解决这个问题