|
![]() |
名片设计 CorelDRAW Illustrator AuotoCAD Painter 其他软件 Photoshop Fireworks Flash |
|
习惯了TCP编程,认为UDP可以包办这些问题是错误的。一个UDP应用程序要承担可靠性方面的全部工作,包括报文的丢失、重复、时延、乱序以及连接失效等问题。 通常我们在可靠性好,传输时延小的局域网上开发测试,一些问题不轻易暴露,但在大型互联网上却会出现错误。 UDP协议把递送的可靠性责任推到了上层即应用层,下面简朴编写了几个类来专门处理两个问题:乱序和丢包。 四个类:DataPacket 类,PacketHeader类,PacketBody类 ,DataEntry类,位于同一个文件DataPacket .java中。 DataPacket 类相称于一个门面模式,提供应外部使用,通信数据也在这个类中处理。 package com.skysoft.pcks; import java.io.*; import java.net.*; import java.util.*; public class DataPacket { InputStream is; OutputStream os; PacketHeader header; PacketBody body; ArrayList al; public static final int DataSwapSize = 64532; /** * 在接收数据报使用 */ public DataPacket() { header = new PacketHeader(); body = new PacketBody(); al = new ArrayList(); } /** * 在发送数据报时使用,它调用报文分割操作. * @param file String 硬盘文件 */ public DataPacket(String file) { this(); try { is = new FileInputStream(file); header.CalcHeaderInfo(is.available()); this.madeBody(); is.close(); //this.Gereratedata(); } catch (FileNotFoundException ex) { ex.printStackTrace(); } catch (IOException ex1) { ex1.printStackTrace(); } } /** * 在发送数据报时使用,它调用报文分割操作. * @param url URL url地址 */ public DataPacket(URL url) { this(); try { //is = url.openStream(); URLConnection conn=url.openConnection(); is=conn.getInputStream(); int total=conn.getContentLength(); header.CalcHeaderInfo(total); this.madeBody(); //System.out.println(total+":"+total); is.close(); } catch (IOException ex) { ex.printStackTrace(); } } /** * 为发送构造分组,使用PackageHeader处理了报头格式,并为分组编序号. */ private void madeBody() { al.clear(); byte[] buffer; DataEntry de; for (int i = 0; i < header.fragmentcounter; i++) { try { ByteArrayOutputStream bos = new ByteArrayOutputStream(); //is.skip(i * body.BODY_BUFFER_SIZE); header.ArrageSort(i); de = new DataEntry(PacketBody.BODY_BUFFER_SIZE); de.setSn(i); de.setStreamsize(header.getStreamsize()); de.setFragmentcounter(header.getFragmentcounter()); if (header.isWTailFragment(i)) { buffer = new byte[header.getMinfragment()]; is.read(buffer, 0, buffer.length); header.setActByteSize(header.getMinfragment()); de.setActByteSize(header.getMinfragment()); } else { buffer = new byte[body.BODY_BUFFER_SIZE]; is.read(buffer, 0, buffer.length); } //System.out.println("length-------"+i+" "+body.getBody().length+" "+header.getMinfragment()); body.setBody(buffer); //System.out.println("length:" + i + " " + header.toString()); bos.write(header.getByte(), 0, header.HEADER_BUFFER_SIZE); bos.write(body.getBody(), 0, body.getBody().length); de.setBytes(bos.toByteArray()); al.add(de); } catch (IOException ex) { ex.printStackTrace(); } } } /** * 为发送构造分组,没有考虑报头格式,也没有为分组编序号. */ private void madeBody1() { al.clear(); for (int i = 0; i < header.fragmentcounter; i++) { try { if (header.isWTailFragment(i)) is.read(body.getBody(), i * body.BODY_BUFFER_SIZE, header.getMinfragment()); else is.read(body.getBody(), i * body.BODY_BUFFER_SIZE, body.BODY_BUFFER_SIZE); ByteArrayOutputStream bos = new ByteArrayOutputStream(); bos.write(header.getByte(), 0, header.HEADER_BUFFER_SIZE); bos.write(body.getBody(), header.HEADER_BUFFER_SIZE, body.getBody().length); al.add(bos); } catch (IOException ex) { ex.printStackTrace(); } } } /** * 在接收到报文后,对此报文执行组装,并处理报文丢失和乱序情况. * @param b1 byte[] */ public void Add(byte[] b1) { byte[] buffer = (byte[]) b1.clone(); handlerText(buffer); DataEntry de = new DataEntry(buffer, header.getActByteSize()); de.setSn(header.getSn()); de.setStreamsize(header.getStreamsize()); de.setFragmentcounter(header.getFragmentcounter()); al.add(de); } private void handlerText(byte[] buffer) { ByteArrayOutputStream baos = new ByteArrayOutputStream(); baos.write(buffer, 0, header.HEADER_BUFFER_SIZE); byte[] b=new byte[header.HEADER_BUFFER_SIZE]; System.arraycopy(buffer,0,b,0,b.length); ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); InputStreamReader isr = new InputStreamReader(bais); BufferedReader br = new BufferedReader(isr); try { header = new PacketHeader(br.readLine()); } catch (Exception ex) { ex.printStackTrace(); } } private String calFileSize(int size) { return size / 1024 + "K"; } public ArrayList getDataPackets() { return al; } /** * 是否接收完毕,通过序号是否等于最大段数来判定,这也许有问题,比如,正好是最后一个段丢失了,这样 * 这个包整个就丢失了. * @return */ public boolean isFull() { return this.header.getSn() == this.header.getFragmentcounter() - 1 ? true : false; } /** * 判定是否只有一个段. * @return */ public boolean isZero() { return this.header.getSn() == 0 ? true : false; } /** * 该函数执行报文组装,不考虑丢失的报文. * @return */ private ByteArrayOutputStream fetchDataPackets() { ByteArrayOutputStream bos = new ByteArrayOutputStream(); byte[] buffer = null; DataEntry de; for (int i = 0; i < al.size(); i++) { try { de = this.getSnData(i); buffer = de.getByte(); if (header.getStreamsize() == de.getStreamsize()) { bos.write(de.getByte(), header.HEADER_BUFFER_SIZE, de.getActByteSize()); System.out.println(de.toString() + " -- fetchDataPackets"); } } catch (Exception ex) { ex.printStackTrace(); } } return bos; } /** * 该函数执行报文组装,对于丢失的报文,写入空报文. * @return ByteArrayOutputStream */ private ByteArrayOutputStream fetchDataPackets_sn() { ByteArrayOutputStream bos = new ByteArrayOutputStream(); byte[] buffer; DataEntry de; for (int i = 0; i < header.getFragmentcounter(); i++) { try { de = this.getSnData(i); if (de == null) { de = seachDeData(i); } buffer = de.getByte(); //System.out.println(de.getSn() + ":" + i); //handlerText(buffer); //bos.write(buffer, header.HEADER_BUFFER_SIZE, // buffer.length - header.HEADER_BUFFER_SIZE); if (header.getStreamsize() == de.getStreamsize()) { bos.write(de.getByte(), header.HEADER_BUFFER_SIZE, de.getActByteSize()); //System.out.println(de.toString()); } } catch (Exception ex) { ex.printStackTrace(); } } return bos; } /** * 对缓冲的数据包进行排序处理,即按顺序提取同一帧的数据,假如没有找到该序号的帧,则返回空值. * @param sn int 要找的帧序号. * @return DataEntry */ private DataEntry getSnData(int sn) { DataEntry de = null; for (int i = 0; i < al.size(); i++) { de = (DataEntry) al.get(i); if (header.getStreamsize() == de.getStreamsize()) { if (sn == de.getSn()) break; else de = null; } } return de; } /** * 按序号开始向前或者是向后寻找最近的帧片段,日后可以增加哀求重发功能,通过开一个通信连接. * @param sn int * @return DataEntry */ private DataEntry seachDeData(int sn) { DataEntry de = null; int initvalue, minvalue = 10000; DataEntry back, fore = null; for (int i = 0; i < al.size(); i++) { de = (DataEntry) al.get(i); if (header.getStreamsize() == de.getStreamsize()) { initvalue = Math.abs(de.getSn() - sn); if (de.getFragmentcounter() != de.getSn() && initvalue < minvalue) { minvalue = initvalue; fore = de; } } } return fore; } /** * 除去最后一帧外,随机抽取一帧. * @return DataEntry */ private DataEntry seachDeData() { DataEntry de = null; for (int i = 0; i < al.size(); i++) { de = (DataEntry) al.get(i); System.out.println("sky ::::" + de.getFragmentcounter() + ":" + de.getSn() + ":" + i); if (header.getStreamsize() == de.getStreamsize()) { if (de.getFragmentcounter() != de.getSn()) { break; } } } return de; } /** * 生成组装完的结果数据.因为用图像来做测试,所以令其返回图像. * @return Image */ public java.awt.Image Gereratedata() { ByteArrayInputStream bis; java.awt.image.BufferedImage bimage = null; try { byte[] b = fetchDataPackets_sn().toByteArray(); //fetchDataPackets_old1() bis = new ByteArrayInputStream(b); bimage = javax.imageio.ImageIO.read(bis); } catch (Exception ex1) { ex1.printStackTrace(); } return bimage; } public static void main(String args[]) { DataPacket dp = new DataPacket("e:\\nature\\14.jpg"); } } /** * 数据实体,充当临时处理场所. * @author Administrator * */ class DataEntry { byte[] bytes; int fragmentcounter, sn, actbytesize; long streamsize; int minfragment; public DataEntry() { } public DataEntry(int size) { this.actbytesize = size; } public DataEntry(byte[] b, int i) { this.bytes = b; this.actbytesize = i; } public byte[] getByte() { return this.bytes; } public void setBytes(byte[] b) { this.bytes = b; } public void setStreamsize(long size) { this.streamsize = size; } public long getStreamsize() { return this.streamsize; } public int getMinfragment() { return minfragment; } public synchronized void setSn(int i) { this.sn = i; } public synchronized int getSn() { return sn; } public synchronized int getFragmentcounter() { return fragmentcounter; } public synchronized void setFragmentcounter(int c) { this.fragmentcounter = c; } public void setActByteSize(int size) { actbytesize = size; } public int getActByteSize() { return actbytesize; } public String toString() { return this.streamsize + "::" + this.fragmentcounter + "::" + this.sn + "::" + this.actbytesize + " recv DataEntry"; } } /** * 报头,处理报头格式 * @author Administrator * */ class PacketHeader implements Serializable{ public static final int HEADER_BUFFER_SIZE = 1024; int fragmentcounter, sn; int actbytesize = PacketBody.BODY_BUFFER_SIZE; byte[] header; //= new byte[HEADER_BUFFER_SIZE]; long streamsize; int minfragment; public PacketHeader() { } public PacketHeader(long l) { this.setStreamsize(l); } public PacketHeader(String s) { String[] tm = s.split("::"); this.setActByteSize(Integer.parseInt(tm[3])); this.setSn(Integer.parseInt(tm[2])); this.setFragmentcounter(Integer.parseInt(tm[1])); this.setStreamsize(Long.parseLong(tm[0])); } /** * 根据文件的段的顺序生成数据头. * @param sn 文件序列 */ public void ArrageSort(int sn) { this.setSn(sn); this.setByte(); } public void CalcHeaderInfo(long l) { this.setStreamsize(l); CalcHeaderInfo(); } /** * 计算流要被分成的片段数量,并得出最小片段余量. */ public void CalcHeaderInfo() { fragmentcounter = Math.round( (float) streamsize / PacketBody.BODY_BUFFER_SIZE); float critical = (float) streamsize / PacketBody.BODY_BUFFER_SIZE; if (critical - fragmentcounter < 0.5 && critical - fragmentcounter > 0) fragmentcounter++; minfragment = (int) (streamsize % PacketBody.BODY_BUFFER_SIZE); } public byte[] getHeader() { Long it = new Long(this.streamsize); return new byte[] {it.byteValue()}; } public byte[] getByte() { return header; //this.toString().getBytes(); } /** * 生成报头字节,首先取得数据包头 流尺寸::段片数::段顺序::段实际尺寸 的字节形式, * 然后加入回车换行符号,对于1024字节中剩余的部分一律写入元素为0的字节数组. */ public void setByte() { ByteArrayOutputStream bos = new ByteArrayOutputStream(); byte[] buffer = this.toByte(); try { bos.write(buffer); bos.write("rn".getBytes()); bos.write(new byte[PacketHeader.HEADER_BUFFER_SIZE - buffer.length], 0, PacketHeader.HEADER_BUFFER_SIZE - buffer.length); header = bos.toByteArray(); } catch (IOException ex) { ex.printStackTrace(); } } public void setStreamsize(long size) { this.streamsize = size; } public long getStreamsize() { return this.streamsize; } public int getMinfragment() { return minfragment; } public synchronized void setSn(int i) { this.sn = i; } public int getSn() { return sn; } public int getFragmentcounter() { return fragmentcounter; } public synchronized void setFragmentcounter(int c) { this.fragmentcounter = c; } public void setActByteSize(int size) { actbytesize = size; setByte(); } public int getActByteSize() { return actbytesize; } /** * 数据包头的格式为:流尺寸::段片数::段顺序::段实际尺寸 * 报头字节长度是可变化的,比如,可以加入流的详细信息如:流所属文件的名称,文件类型以及一些其他信息. * @return String */ public String toString() { return streamsize + "::" + this.fragmentcounter + "::" + this.getSn() + "::" + this.getActByteSize(); } public byte[] toByte() { return this.toString().getBytes(); } /** * 是否为尾段 * @param i int * @return boolean */ public boolean isWTailFragment(int i) { return (i == fragmentcounter - 1) ? true : false; } } /** * 用户数据区 * @author Administrator * */ class PacketBody implements Serializable{ public static final int BODY_BUFFER_SIZE = 63508; //65508 byte[] body; public PacketBody() { } public void setBody(byte[] b) { this.body = b; } public byte[] getBody() { return body; } } 这个数据处理类,将在接下来使用。 返回类别: 教程 上一教程: 讲解关于编写跨平台JAVA程序时的注重事项 下一教程: 制作可以执行的 JAR 文件包及 JAR 命令详解 您可以阅读与"JAVA数据报之失序和丢包"相关的教程: · Java数据报之失序和丢包 · Java数据报编程之概说 · JAVA数据报编程之单播 · Java数据报编程之组播 · 使用JAVA实现数据报通讯过程 |
![]() ![]() |
快精灵印艺坊 版权所有 |
首页![]() ![]() ![]() ![]() ![]() ![]() ![]() |