SparkStreaming下报net.jpountz.lz4.LZ4BlockInputStream的解决

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
21/04/21 14:40:00 ERROR Executor: Exception in task 0.0 in stage 16.0 (TID 22)
java.lang.NoSuchMethodError: net.jpountz.lz4.LZ4BlockInputStream.<init>(Ljava/io/InputStream;Z)V
at org.apache.spark.io.LZ4CompressionCodec.compressedInputStream(CompressionCodec.scala:122)
at org.apache.spark.serializer.SerializerManager.wrapForCompression(SerializerManager.scala:163)
at org.apache.spark.serializer.SerializerManager.wrapStream(SerializerManager.scala:124)
at org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$3.apply(BlockStoreShuffleReader.scala:50)
at org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$3.apply(BlockStoreShuffleReader.scala:50)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:426)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:62)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:153)
at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:84)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming_WordCount {
def main(args: Array[String]): Unit = {
// 创建环境镜像
// StreamingContext创建时,需要传递两个参数
// 第一个参数表示环境配置
val sparkConf=new SparkConf().setMaster("local[*]").setAppName("SparkStream")
// 第二个参数表示批处理周期(采集周期)
val ssc=new StreamingContext(sparkConf,Seconds(3))
// 逻辑处理
// 获取端口数据
val lines=ssc.socketTextStream("hadoop01",5555)
val words=lines.flatMap(_.split(" "))
val wordToOne=words.map((_,1))
val wordToCount = wordToOne.reduceByKey(_ + _)
wordToCount.print()


// 关闭环境
// 由于SparkStream采集器是长期执行的任务,所以不能直接关闭
// 如果main方法执行完毕,应用程序也会自动结束。所以不能让main方法执行结束
// ssc.stop()
// 1.启动采集器
ssc.start()
// 2.等待采集器的关闭
ssc.awaitTermination()
}
}

解决方法:
添加.set("spark.io.compression.codec", "snappy")

1
2
val sparkConf=new SparkConf().setMaster("local[*]").setAppName("SparkStream").set("spark.io.compression.codec", "snappy")