现在的需求是在一台Flume采集机器上,往Hadoop集群上写HDFS,该机器没有安装Hadoop。
这里的Flume版本是1.6.0,Hadoop版本是2.7.1.
core-site.xml两个配置文件复制到 flume安装目录的conf目录去,把hadoop-hdfs-2.7.1.jar复制到 Flume lib目录。
一、Flume配置文件:
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = syslogtcp
a1.sources.r1.bind = 192.168 . 110.160 # 本机ip
a1.sources.r1.port = 23003
a1.sources.r1.workerThreads = 10
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 100000
a1.channels.c1.keep-alive = 6
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs: //clusterpc/test/flume/%y-%m-%d
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp= true
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动: bin/flume-ng agent --conf conf --conf-file conf/flume-tcp-memory-hdfs.conf --name a1 -Dflume.root.logger=info,console
二、错误集:
1、 找不到主机名
2016 - 09 - 19 16 : 15 : 48 , 518 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java: 459 )] process failed
java.lang.IllegalArgumentException: java.net.UnknownHostException: cluster
at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java: 378 )
at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java: 310 )
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java: 176 )
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java: 678 )
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java: 619 )
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java: 149 )
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java: 2653 )
at org.apache.hadoop.fs.FileSystem.access$ 200 (FileSystem.java: 92 )
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java: 2687 )
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java: 2669 )
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java: 371 )
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java: 170 )
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java: 355 )
at org.apache.hadoop.fs.Path.getFileSystem(Path.java: 295 )
at org.apache.flume.sink.hdfs.BucketWriter$ 1 .call(BucketWriter.java: 243 )
at org.apache.flume.sink.hdfs.BucketWriter$ 1 .call(BucketWriter.java: 235 )
at org.apache.flume.sink.hdfs.BucketWriter$ 9 $ 1 .run(BucketWriter.java: 679 )
at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java: 50 )
at org.apache.flume.sink.hdfs.BucketWriter$ 9 .call(BucketWriter.java: 676 )
at java.util.concurrent.FutureTask.run(FutureTask.java: 262 )
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java: 1145 )
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java: 615 )
at java.lang.Thread.run(Thread.java: 744 )
Caused by: java.net.UnknownHostException: cluster
cluster是公司Hadoop集群NameService的名字,这个错误是由于找不到Hadoop集群NameService造成的,所以需要把hdfs-site.xml复制到flume/conf目录。
2、
java.io.IOException: Mkdirs failed to create /test/flume/ 16 - 09 - 19 (exists= false , cwd=file:/data/apache-flume- 1.6 . 0 -bin)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java: 450 )
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java: 435 )
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java: 909 )
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java: 890 )
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java: 787 )
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java: 776 )
at org.apache.flume.sink.hdfs.HDFSSequenceFile.open(HDFSSequenceFile.java: 96 )
at org.apache.flume.sink.hdfs.HDFSSequenceFile.open(HDFSSequenceFile.java: 78 )
at org.apache.flume.sink.hdfs.HDFSSequenceFile.open(HDFSSequenceFile.java: 69 )
at org.apache.flume.sink.hdfs.BucketWriter$ 1 .call(BucketWriter.java: 246 )
at org.apache.flume.sink.hdfs.BucketWriter$ 1 .call(BucketWriter.java: 235 )
at org.apache.flume.sink.hdfs.BucketWriter$ 9 $ 1 .run(BucketWriter.java: 679 )
at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java: 50 )
at org.apache.flume.sink.hdfs.BucketWriter$ 9 .call(BucketWriter.java: 676 )
at java.util.concurrent.FutureTask.run(FutureTask.java: 262 )
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java: 1145 )
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java: 615 )
at java.lang.Thread.run(Thread.java: 744 )
core-site.xml复制到flume/conf目录
3、
java.io.IOException: No FileSystem for scheme: hdfs
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java: 2644 )
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java: 2651 )
at org.apache.hadoop.fs.FileSystem.access$ 200 (FileSystem.java: 92 )
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java: 2687 )
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java: 2669 )
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java: 371 )
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java: 170 )
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java: 355 )
at org.apache.hadoop.fs.Path.getFileSystem(Path.java: 295 )
at org.apache.flume.sink.hdfs.BucketWriter$ 1 .call(BucketWriter.java: 243 )
at org.apache.flume.sink.hdfs.BucketWriter$ 1 .call(BucketWriter.java: 235 )
at org.apache.flume.sink.hdfs.BucketWriter$ 9 $ 1 .run(BucketWriter.java: 679 )
at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java: 50 )
at org.apache.flume.sink.hdfs.BucketWriter$ 9 .call(BucketWriter.java: 676 )
at java.util.concurrent.FutureTask.run(FutureTask.java: 262 )
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java: 1145 )
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java: 615 )
at java.lang.Thread.run(Thread.java: 744 )
把hadoop-hdfs-2.7.1.jar复制到flume/lib目录下
4、HDFS权限不足,这里往HDFS写文件的用户是登录Flume采集机器的用户。
org.apache.hadoop.security.AccessControlException: Permission denied: user=kafka, access=WRITE, inode= "/test/flume/16-09-19/events-.1474268726127.tmp" :hadoop:supergroup:drwxr-xr-x
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java: 319 )
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java: 292 )
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java: 213 )
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java: 190 )
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java: 1698 )
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java: 1682 )
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java: 1665 )
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java: 2517 )
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java: 2452 )
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java: 2335 )
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java: 623 )
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java: 397 )
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$ 2 .callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java: 616 )
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java: 969 )
at org.apache.hadoop.ipc.Server$Handler$ 1 .run(Server.java: 2049 )
at org.apache.hadoop.ipc.Server$Handler$ 1 .run(Server.java: 2045 )
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java: 415 )
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java: 1657 )
at org.apache.hadoop.ipc.Server$Handler.run(Server.java: 2043 )
HDFS 权限不足,要授权。hadoop fs -chmod -R 777 /test/
5、时间戳
java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
at com.google.common.base.Preconditions.checkNotNull(Preconditions.java: 204 )
at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java: 228 )
at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java: 432 )
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java: 380 )
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java: 68 )
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java: 147 )
at java.lang.Thread.run(Thread.java: 744 )
原因是Event对象headers没有设置timestamp造成的,解决办法:设置a1.sinks.k1.hdfs.useLocalTimeStamp=true,使用本地时间戳。