Commons Net FTPClient与Mule无限期挂起

我遇到了Mule ESB FTP传输的问题:轮询时,运行客户端的线程将无限期挂起而不会抛出错误。 这会导致FTP轮询完全停止。 Mule使用Apache Commons Net FTPClient。

进一步研究代码,我认为这是由FTPClient的SocketTimeout没有设置引起的,有时候在从FTPClient的套接字读取行时会导致无限挂起。

当问题发生时,我们可以清楚地看到用jstack检索到的这些堆栈中的问题。 __getReply()函数似乎是问题的更直接链接。

这个在创建新的FTPClient时挂在connect()调用上:

receiver.172 prio=10 tid=0x00007f23e43c8800 nid=0x2d5 runnable [0x00007f24c32f1000] java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.read(SocketInputStream.java:152) at java.net.SocketInputStream.read(SocketInputStream.java:122) at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283) at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325) at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177) - locked  (a java.io.InputStreamReader) at java.io.InputStreamReader.read(InputStreamReader.java:184) at java.io.BufferedReader.fill(BufferedReader.java:154) at java.io.BufferedReader.readLine(BufferedReader.java:317) - locked  (a java.io.InputStreamReader) at java.io.BufferedReader.readLine(BufferedReader.java:382) at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:294) at org.apache.commons.net.ftp.FTP._connectAction_(FTP.java:364) at org.apache.commons.net.ftp.FTPClient._connectAction_(FTPClient.java:540) at org.apache.commons.net.SocketClient.connect(SocketClient.java:178) at org.mule.transport.ftp.FtpConnectionFactory.makeObject(FtpConnectionFactory.java:33) at org.apache.commons.pool.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:1188) at org.mule.transport.ftp.FtpConnector.getFtp(FtpConnector.java:172) at org.mule.transport.ftp.FtpConnector.createFtpClient(FtpConnector.java:637) at org.mule.transport.ftp.FtpMessageReceiver.listFiles(FtpMessageReceiver.java:134) at org.mule.transport.ftp.FtpMessageReceiver.poll(FtpMessageReceiver.java:94) at org.mule.transport.AbstractPollingMessageReceiver.performPoll(AbstractPollingMessageReceiver.java:216) at org.mule.transport.PollingReceiverWorker.poll(PollingReceiverWorker.java:80) at org.mule.transport.PollingReceiverWorker.run(PollingReceiverWorker.java:49) at org.mule.transport.TrackingWorkManager$TrackeableWork.run(TrackingWorkManager.java:267) at org.mule.work.WorkerContext.run(WorkerContext.java:286) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Locked ownable synchronizers: -  (a java.util.concurrent.ThreadPoolExecutor$Worker) 

使用listFiles()时,另一个挂在pasv()上的调用:

 receiver.137" prio=10 tid=0x00007f23e433b000 nid=0x7c06 runnable [0x00007f24c2fee000] java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.read(SocketInputStream.java:152) at java.net.SocketInputStream.read(SocketInputStream.java:122) at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283) at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325) at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177) - locked  (a java.io.InputStreamReader) at java.io.InputStreamReader.read(InputStreamReader.java:184) at java.io.BufferedReader.fill(BufferedReader.java:154) at java.io.BufferedReader.readLine(BufferedReader.java:317) - locked  (a java.io.InputStreamReader) at java.io.BufferedReader.readLine(BufferedReader.java:382) at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:294) at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:490) at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:534) at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:583) at org.apache.commons.net.ftp.FTP.pasv(FTP.java:882) at org.apache.commons.net.ftp.FTPClient._openDataConnection_(FTPClient.java:497) at org.apache.commons.net.ftp.FTPClient.initiateListParsing(FTPClient.java:2296) at org.apache.commons.net.ftp.FTPClient.initiateListParsing(FTPClient.java:2269) at org.apache.commons.net.ftp.FTPClient.initiateListParsing(FTPClient.java:2189) at org.apache.commons.net.ftp.FTPClient.initiateListParsing(FTPClient.java:2132) at org.mule.transport.ftp.FtpMessageReceiver.listFiles(FtpMessageReceiver.java:135) at org.mule.transport.ftp.FtpMessageReceiver.poll(FtpMessageReceiver.java:94) at org.mule.transport.AbstractPollingMessageReceiver.performPoll(AbstractPollingMessageReceiver.java:216) at org.mule.transport.PollingReceiverWorker.poll(PollingReceiverWorker.java:80) at org.mule.transport.PollingReceiverWorker.run(PollingReceiverWorker.java:49) at org.mule.transport.TrackingWorkManager$TrackeableWork.run(TrackingWorkManager.java:267) at org.mule.work.WorkerContext.run(WorkerContext.java:286) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Locked ownable synchronizers: -  (a java.util.concurrent.ThreadPoolExecutor$Worker) 

我认为问题是由在Mule默认FtpConnectionFactory中使用默认的FTPClient构造函数(扩展SocketClient)引起的。

请注意,setConnectTimeout()值似乎仅在调用socket.connect()时使用,但在使用相同套接字的其他操作中被忽略:

 protected FTPClient createFtpClient() { FTPClient ftpClient = new FTPClient(); ftpClient.setConnectTimeout(connectionTimeout); return ftpClient; } 

它使用FTPClient()构造函数,它本身使用SocketClient,在创建套接字时定义了0超时。

 public SocketClient() { ... _timeout_ = 0; ... } 

然后我们调用connectc(),它调用_ connectAction()_。

在SocketClient中:

 protected void _connectAction_() throws IOException { ... _socket_.setSoTimeout(_timeout_); ... } 

在FTP中,新的Reader与我们的永久套接字一起实现:

 protected _connectAction_(){ ... _controlInput_ = new BufferedReader(new InputStreamReader(_socket_.getInputStream(), getControlEncoding())); ... } 

然后,当调用__getReply()函数时,我们使用这个带有永久插件的Reader:

 private void __getReply() throws IOException { ... String line = _controlInput_.readLine(); ... } 

对不起,很长的post,但我认为这需要正确的解释。 解决方案可能是在connect()之后调用setSoTimeout()来定义套接字超时。

具有默认超时似乎不是可接受的解决方案,因为每个用户可能具有不同的需求并且默认在任何情况下都不合适。 https://issues.apache.org/jira/browse/NET-35

最后,这提出了两个问题:

  1. 这对我来说似乎是个错误,因为它会完全停止FTP轮询而不会出错。 你怎么看?
  2. 有什么可以避免这种情况的简单方法? 使用自定义FtpConnectionFactory调用setSoTimeout()? 我错过了某处的配置或参数吗?

谢谢你提前。

编辑:我正在使用Mule CE Standalone 3.5.0,它似乎使用Apache Commons Net 2.0。 但是看一下代码,Mule CE Standalone 3.7和Commons Net 2.2似乎并没有什么不同。 以下是涉及的源代码:

https://github.com/mulesoft/mule/blob/mule-3.5.x/transports/ftp/src/main/java/org/mule/transport/ftp/FtpConnectionFactory.java

http://grepcode.com/file/repo1.maven.org/maven2/commons-net/commons-net/2.0/org/apache/commons/net/SocketClient.java

http://grepcode.com/file/repo1.maven.org/maven2/commons-net/commons-net/2.0/org/apache/commons/net/ftp/FTP.java

http://grepcode.com/file/repo1.maven.org/maven2/commons-net/commons-net/2.0/org/apache/commons/net/ftp/FTPClient.java

在一个理想的世界中,超时不应该是必要的,但它看起来就像你的情况一样。

您的描述非常全面,您是否考虑过提出错误 ?

要解决此问题,我建议首先在高级选项卡中使用“响应超时”。 如果那不起作用,我会使用服务覆盖 ,从那里你应该能够覆盖接收器。

我在之前的两个案例中使用MockFtpServer重现了错误,并且我能够使用FtpConnectionFactory来解决这个问题。

 public class SafeFtpConnectionFactory extends FtpConnectionFactory{ //define a default timeout public static int defaultTimeout = 60000; public static synchronized int getDefaultTimeout() { return defaultTimeout; } public static synchronized void setDefaultTimeout(int defaultTimeout) { SafeFtpConnectionFactory.defaultTimeout = defaultTimeout; } public SafeFtpConnectionFactory(EndpointURI uri) { super(uri); } @Override protected FTPClient createFtpClient() { FTPClient client = super.createFtpClient(); //Define the default timeout here, which will be used by the socket by default, //instead of the 0 timeout hanging indefinitely client.setDefaultTimeout(getDefaultTimeout()); return client; } } 

然后将它连接到我的连接器:

    

使用此配置,将在指定的超时后抛出java.net.SocketTimeoutException,例如:

 java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.read(SocketInputStream.java:152) at java.net.SocketInputStream.read(SocketInputStream.java:122) at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283) at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325) at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177) at java.io.InputStreamReader.read(InputStreamReader.java:184) at java.io.BufferedReader.fill(BufferedReader.java:154) at java.io.BufferedReader.readLine(BufferedReader.java:317) at java.io.BufferedReader.readLine(BufferedReader.java:382) at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:294) at org.apache.commons.net.ftp.FTP._connectAction_(FTP.java:364) at org.apache.commons.net.ftp.FTPClient._connectAction_(FTPClient.java:540) at org.apache.commons.net.SocketClient.connect(SocketClient.java:178) at org.mule.transport.ftp.FtpConnectionFactory.makeObject(FtpConnectionFactory.java:33) at org.apache.commons.pool.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:1188) at org.mule.transport.ftp.FtpConnector.getFtp(FtpConnector.java:172) at org.mule.transport.ftp.FtpConnector.createFtpClient(FtpConnector.java:637) ... 

否则,尝试connect()或pasv()将无限期挂起而不响应服务器响应。 我使用模拟FTP重现了这个确切的行为。

注意:我使用了setDefaultTimeout(),因为它似乎是与connect()和connectAction()(来自SocketClient源)一起使用的变量:

 public abstract class SocketClient { ... protected void _connectAction_() throws IOException { ... _socket_.setSoTimeout(_timeout_); ... } ... public void setDefaultTimeout(int timeout) { _timeout_ = timeout; } ... } 

编辑:对于那些感兴趣的人,这里是用于重现永不应答服务器的模拟FTP的测试代码。 然而,无限循环远非良好的实践。 它应该被替换为类似sleep的内容,其中包含一个期望SocketTimeoutexception的封闭Test类,并确保在给定的超时后失败。

  private static final int CONTROL_PORT = 2121; public void startStubFtpServer(){ FakeFtpServer fakeFtpServer = new FakeFtpServer(); //define the command which should never be answered fakeFtpServer.setCommandHandler(CommandNames.PASV, new EverlastingCommandHandler()); //fakeFtpServer.setCommandHandler(CommandNames.CONNECT, new EverlastingConnectCommandHandler()); //or any other command... //server config ... //start server fakeFtpServer.setServerControlPort(CONTROL_PORT); fakeFtpServer.start(); ... } //will cause any command received to never have an answer public class EverlastingConnectCommandHandler extends org.mockftpserver.core.command.AbstractStaticReplyCommandHandler{ @Override protected void handleCommand(Command cmd, Session session, InvocationRecord rec) throws Exception { while(true){ try { Thread.sleep(60000); } catch (InterruptedException e) { //TODO } } } } public class EverlastingCommandHandler extends AbstractFakeCommandHandler { @Override protected void handle(Command cmd, Session session) { while(true){ try { Thread.sleep(60000); } catch (InterruptedException e) { //TODO } } } };