带序列号的UDP

我正在尝试为Java中的类赋值实现可靠的UDP协议。 我已经设法将确认添加到收到的每个数据报包中,但是我在发送的数据报包中实现序列号时遇到了问题。

谁能建议一个简单的方法来实现这个?

@EJP我试过实现你刚刚建议的内容。 这是我的代码,直到现在(它仍然非常原始 – 我正在使用命中和尝试方法来实现它)

服务器端

public class TestServer extends Activity { private DatagramSocket serverSocket; Thread serverThread = null; byte[] incomingData; byte[] outgoingData; //int numBytesRead = 0; int ackSent = 0; int numPackRecv = 0; int BUF_SIZE = 1024; String msg = "ACK"; BufferedInputStream data=null; BufferedOutputStream out =null; public static final int SERVERPORT = 6000; String outputFile = "/sdcard/Movies/asddcopy.mp4"; @Override public void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_test_server); this.serverThread = new Thread(new ServerThread()); this.serverThread.start(); } @Override protected void onStop() { super.onStop(); try { serverSocket.close(); } catch (Exception e) { Log.d("SERVER", "Inside onStop()"); Log.d("SERVER", Log.getStackTraceString(e)); } } class ServerThread implements Runnable { @SuppressLint("NewApi") public void run() { try { serverSocket = new DatagramSocket(SERVERPORT); incomingData = new byte[BUF_SIZE]; //outgoingData = new byte[512]; outgoingData = msg.getBytes(); long startRxPackets = TrafficStats.getUidRxPackets(Process.myUid()); long startTime = System.nanoTime(); out = new BufferedOutputStream(new FileOutputStream(outputFile, true)); while (!Thread.currentThread().isInterrupted()) { //serverSocket.setSoTimeout(5000); while (true) { try{ //DatagramPacket incomingPacket = new DatagramPacket(incomingData, incomingData.length); DatagramPacket incomingPacket = new DatagramPacket(incomingData, BUF_SIZE); serverSocket.receive(incomingPacket); byte[] data = incomingPacket.getData(); //out.write(data,0,incomingPacket.getLength()); //String msg = new String(incomingPacket.getData()); ByteArrayInputStream in = new ByteArrayInputStream(data); ObjectInputStream is = new ObjectInputStream(in); if (is == null) { is = new ObjectInputStream(in); } Message msg = (Message) is.readObject(); System.out.println(msg.getSeqNo()); /*if ("END".equals(msg.substring(0, 3).trim())) { Log.d("SERVER", "Inside END condition"); break; }*/ out.write(msg.getData(),0,msg.getData().length); numPackRecv += 1; Log.d("SERVER", "Packet Received: " + numPackRecv); InetAddress client = incomingPacket.getAddress(); int client_port = incomingPacket.getPort(); DatagramPacket outgoingPacket = new DatagramPacket(outgoingData, outgoingData.length, client, client_port); serverSocket.send(outgoingPacket); ackSent += 1; //Log.d("SERVER","Packet Received: " + numPackRecv + " :: " + "Ack Sent: " + ackSent); }catch(Exception e) { Log.d("SERVER", "Inside run() ex1"); Log.d("SERVER", Log.getStackTraceString(e)); break; } } out.close(); serverSocket.disconnect(); serverSocket.close(); Log.d("SERVER", "Transfer Complete"); Log.d("SERVER", "Actual Time elapsed = " + (System.nanoTime() - startTime)/Math.pow(10, 9) + " s"); Log.d("SERVER", "Total Packets Received = " + Long.toString(TrafficStats.getUidRxPackets(Process.myUid()) - startRxPackets)); Log.d("SERVER", "Packets Received from Socket = " + numPackRecv); break; } out.close(); serverSocket.disconnect(); serverSocket.close(); /* Log.d("SERVER", "Transfer Complete"); Log.d("SERVER", "Actual Time elapsed = " + (System.nanoTime() - startTime)/Math.pow(10, 9) + " s"); Log.d("SERVER", "Total Packets Received = " + Long.toString(TrafficStats.getUidRxPackets(Process.myUid()) - startRxPackets)); Log.d("SERVER", "Packets Received from Socket = " + numPackRecv);*/ }catch (Exception e) { Log.d("SERVER", "Inside run() ex2"); Log.d("SERVER", Log.getStackTraceString(e)); serverSocket.disconnect(); serverSocket.close(); } } } 

这是客户端

  public class TestClient extends Activity { private DatagramSocket clientSocket; byte[] incomingData; int BUF_SIZE = 500; int numBytesRead = 0; int numPackSent = 0; private static final int SERVERPORT = 6000; private static final String SERVER_IP = "10.0.0.22"; String inFile = "/sdcard/Movies/asdd.mp4"; @Override public void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_test_client); new Thread(new ClientThread()).start(); } public void onClick(View view) { new workInProgress().execute(""); } private class workInProgress extends AsyncTask { @SuppressLint("NewApi") @Override protected Object doInBackground(Object... params) { try { Log.d("CLIENT", "Sending a file to the server..."); BufferedInputStream inputBuf = new BufferedInputStream(new FileInputStream(inFile)); //byte[] fileBytes = new byte[(int) inFile.length()]; byte[] fileBytes = new byte[BUF_SIZE]; incomingData = new byte[BUF_SIZE]; double numPktToSend = Math.ceil(inFile.length()*1.0/BUF_SIZE); //Log.d("CLIENT", "Total packets to be sent = " + numPktToSend); int sleepCycle = 1; long sysPackSent = 0; //long startTxPackets = TrafficStats.getTotalTxPackets(); long startTxPackets = TrafficStats.getUidTxPackets(Process.myUid()); Log.d("CLIENT", "startTxPacks: " + startTxPackets); long packDrops = 0; long startTime = System.nanoTime(); long count=0; long ackRec=0; int seqNo = 0; ByteArrayOutputStream outStream = new ByteArrayOutputStream(); ObjectOutputStream os = new ObjectOutputStream(outStream); while((numBytesRead = inputBuf.read(fileBytes)) != -1) { //DatagramPacket packet = new DatagramPacket(fileBytes, fileBytes.length); if (os == null) { os = new ObjectOutputStream(outStream); } Message msg = new Message(++seqNo, fileBytes, false); os.writeObject(msg); os.flush(); os.reset(); byte[] data = outStream.toByteArray(); DatagramPacket packet = new DatagramPacket(data, data.length); clientSocket.send(packet); numPackSent += 1; //Log.d("CLIENT", "No of packets sent = " + numPackSent); sysPackSent = TrafficStats.getUidTxPackets(Process.myUid()) - startTxPackets; try{ clientSocket.setSoTimeout(5000); packet = new DatagramPacket(incomingData, incomingData.length); clientSocket.receive(packet); String recAck = new String(packet.getData()); ackRec++; } catch(Exception e) { //Log.d("CLIENT", Log.getStackTraceString(e)); } packDrops = numPackSent - ackRec; if (packDrops > count) { sleepCycle = Math.min(16, sleepCycle * 2); count = packDrops; Log.d("CLIENT",String.valueOf(sleepCycle) + " :: " + numPackSent); } else { sleepCycle = Math.max(sleepCycle - 1, 1); } Thread.sleep(sleepCycle); } if (numBytesRead == -1) { fileBytes = "END".getBytes(); Log.d("CLIENT", "Sending END Packet"); clientSocket.send(new DatagramPacket(fileBytes, fileBytes.length)); } Log.d("CLIENT", "Actual Time elapsed = " + (System.nanoTime() - startTime)/Math.pow(10, 9) + " s"); Log.d("CLIENT", "Total Packets Transmitted = " + Long.toString(sysPackSent)); Log.d("CLIENT", "No of packets dropped = " + String.valueOf(packDrops)); Log.d("CLIENT", "Packets Pushed to Socket = " + numPackSent); Log.d("CLIENT", "Number of Acknoledgments received " +ackRec); inputBuf.close(); os.close(); outStream.close(); clientSocket.disconnect(); clientSocket.close(); Log.d("CLIENT", "Sending file.. Complete!!!"); } catch (Exception e) { Log.d("CLIENT", Log.getStackTraceString(e)); clientSocket.disconnect(); clientSocket.close(); } return null; } } class ClientThread implements Runnable { @Override public void run() { try { InetAddress serverAddr = InetAddress.getByName(SERVER_IP); clientSocket = new DatagramSocket(); clientSocket.connect(serverAddr, SERVERPORT); Log.d("CLIENT", "Connection Successful"); } catch (UnknownHostException e1) { Log.d("CLIENT", "Inside run() UnknownHostEx"); Log.d("CLIENT", Log.getStackTraceString(e1)); } catch (IOException e1) { Log.d("CLIENT", "Inside run() IOEx"); Log.d("CLIENT", Log.getStackTraceString(e1)); } } } 

我在服务器端遇到一些错误:

  1. 我收到每个数据包的相同序列号(即1)
  2. 我不确定传入数据包的缓冲区大小,因为我在客户端使用500个字节,在服务器端使用1024个字节。 如果我在这两个代码中占用500个字节,则会收到文件结束exception。

如果你能提出更好的方法来实现同样的事情我真的很感激! 谢谢 :)

谢谢!

  1. 创建一个ByteArrayOutputStream.
  2. 将其包装在DataOutputStream
  3. 使用DataOutputStream.writeInt()来编写序列号。
  4. 使用write()写入数据。
  5. ByteArrayOutputStream.返回的字节数组构造DatagramPacket ByteArrayOutputStream.

在接收器处,完全相反,在每种情况下使用补充类和方法。 这些是留给读者的练习。

最简单的方法可能是查看TCP协议,并将所有TCP标头粘贴到每个UDP数据包的开头。