6.9 批量并行读写远程文件和事务补偿处理

FileBatch类支持批量并行读写操作,包括对FttpAdapter和FileAdapter的支持,它跟并行读写的区别是不需要检查结果,会等到所有并行读写任务全部完成才返回,并在发生异常时提供事务补偿支持。

1.批量并行读

  1. public Result<byte[]>[] readAllBatch(TryByteReadAdapter[] fras)

实现对多个FttpReadAdapter任务的批量读,输入一个FttpReadAdapter数组,并行进行它们的读取,直到每个FttpReadAdapter读完后,以数组的方式批量输出它们对应的结果,比如:

  1. FttpReadAdapter[] fras = new FttpReadAdapter[3];
  2. fras[0]=new FttpAdapter(fttppath).getFttpReader(0,5);
  3. fras[1]=new FttpAdapter(fttppath).getFttpReader(5,5);
  4. fras[2]=new FttpAdapter(fttppath).getFttpReader(10,5);
  5. Result<byte[]>[] rs = new FileBatch().readAllBatch(fras);

上面代码表示并行从3个位置读一个文件内容,等全部读完后,将对应的结果放在一个数组中返回。

2.批量并行写

  1. FttpWriteAdapter[] fwas = new FttpWriteAdapter[3];
  2. fwas[0]=new FttpAdapter(fttppath).getFttpWriter(0,5);
  3. fwas[1]=new FttpAdapter(fttppath).getFttpWriter(5,5);
  4. fwas[2]=new FttpAdapter(fttppath).getFttpWriter(10,5);
  5. Result<Integer>[] rs = new FileBatch().writeBatch(fwas, "abcde".getBytes());

上面代码表示并行对一个文件的3个位置写入"abcde"字符,等全部写完后,返回对应的结果数组。

6.9 批量并行读写远程文件和事务补偿处理 - 图1注意

这里与并行读写一样,3个FttpReadAdapter或者FttpWriteAdapter是由3个不同的FttpAdapter生成,而不是同一个生成。

3.批量并行读写

  1. Result<Integer>[] rs = new FileBatch().readWriteBatch(fras,fwas);

上面的代码表示将前面的批量读和批量写在一个过程中完成,从fras中每个FttpReadAdapter读,然后通过fwas中对应的每FttpWriteAdapter写入,所有读写完成后返回写入结果数组。

4.事务补偿处理

在批量并行读写过程中,如果其中一个FttpReadAdapter或FttpWriteAdapter发生错误,那么框架会进行分布式事务处理,进行两阶段提交,然后调用undo操作进行事务补偿处理,撤销已经产生的改动和影响。

FileBatch类提供了对undo方法的定义:

  1. public Result[] undo(Result[] rtarr)

rtarr是传入的结果,然后返回执行undo撤销处理后的结果。

比如调用readAllBatch发生错误,FileBatch会将结果传入undo进行撤销操作,然后才返回结果。

因此开发者需要自己实现undo方法的内容,继承FileBatch类覆盖undo方法:

  1. public Result[] undo(Result[] rtarr){
  2. for(int i=0;i<rtarr.length;i++){
  3. if(rtarr[i].getStatus()==Result.EXCEPTION)
  4. System.out.println("Result index"+i+" Error");
  5. }
  6.  
  7. return rtarr;
  8. }

上面的undo方法将发生异常的结果的序号输出显示。

所有的批量读写方法都可以以排它的方式进行,只需指定boolean locked参数即可。

另外,除了支持byte批量并行读写外,也支持所有的整型批量并行读写,提供的API和操作几乎类似。

FttpBatchWriteReadDemo演示了一个批量并行读、批量并行写、批量并行读写操作和事务补偿操作。

运行步骤如下:

1)启动ParkServerDemo:

  1. java -cp fourinone.jar; ParkServerDemo

2)在192.168.0.1机器上启动FttpServer:

  1. java -cp fourinone.jar; FttpServer 192.168.0.1

3)在本地运行FttpBatchWriteReadDemo:

  1. java -cp fourinone.jar; FttpMulWriteReadDemo

完整demo源码如下:

  1. // FttpBatchWriteReadDemo
  2. import com.fourinone.FttpAdapter;
  3. import com.fourinone.FttpException;
  4. import com.fourinone.Result;
  5. import com.fourinone.FttpAdapter.FttpReadAdapter;
  6. import com.fourinone.FttpAdapter.FttpWriteAdapter;
  7. import com.fourinone.FileBatch;
  8.  
  9. public class FttpBatchWriteReadDemo extends FileBatch
  10. {
  11. public void fttpBatchWrite(){
  12. try{
  13. String fttppath = "fttp://192.168.0.1/home/log/1.log";
  14. FttpWriteAdapter[] fwas = new FttpWriteAdapter[3];
  15.  
  16. FttpAdapter fa0 = new FttpAdapter(fttppath);
  17. fwas[0]=fa0.getFttpWriter(0,5);
  18.  
  19. FttpAdapter fa1 = new FttpAdapter(fttppath);
  20. fwas[1]=fa1.getFttpWriter(5,5);
  21.  
  22. FttpAdapter fa2 = new FttpAdapter(fttppath);
  23. fwas[2]=fa2.getFttpWriter(10,5);
  24.  
  25. Result<Integer>[] rs = this.writeBatch(fwas, "abcde".getBytes());
  26.  
  27. System.out.println(rs[0].getResult());
  28. System.out.println(rs[1].getResult());
  29. System.out.println(rs[2].getResult());
  30.  
  31. fa0.close();
  32. fa1.close();
  33. fa2.close();
  34. }catch(FttpException fe){
  35. fe.printStackTrace();
  36. }
  37. }
  38.  
  39. public void fttpBatchRead(){
  40. try{
  41. String fttppath = "fttp://192.168.0.1/home/log/1.log";
  42.  
  43. FttpReadAdapter[] fras = new FttpReadAdapter[3];
  44.  
  45. FttpAdapter fa0 = new FttpAdapter(fttppath);
  46. fras[0]=fa0.getFttpReader(0,5);
  47.  
  48. FttpAdapter fa1 = new FttpAdapter(fttppath);
  49. fras[1]=fa1.getFttpReader(5,5);
  50.  
  51. FttpAdapter fa2 = new FttpAdapter(fttppath);
  52. fras[2]=fa2.getFttpReader(10,5);
  53.  
  54. Result<byte[]>[] rs = this.readAllBatch(fras);
  55.  
  56. System.out.println(new String(rs[0].getResult()));
  57. System.out.println(new String(rs[1].getResult()));
  58. System.out.println(new String(rs[2].getResult()));
  59.  
  60. fa0.close();
  61. fa1.close();
  62. fa2.close();
  63. }catch(FttpException fe){
  64. fe.printStackTrace();
  65. }
  66. }
  67.  
  68. public void fttpBatchReadWrite(){
  69. try{
  70. String readpath = "fttp://192.168.0.1/home/log/1.log";
  71. FttpReadAdapter[] fras = new FttpReadAdapter[3];
  72. FttpAdapter fa0 = new FttpAdapter(readpath);
  73. fras[0]=fa0.getFttpReader(0,5);
  74. FttpAdapter fa1 = new FttpAdapter(readpath);
  75. fras[1]=fa1.getFttpReader(5,5);
  76. FttpAdapter fa2 = new FttpAdapter(readpath);
  77. fras[2]=fa2.getFttpReader(10,5);
  78.  
  79. String writepath = "fttp://192.168.0.1/home/log/2.log";
  80. FttpWriteAdapter[] fwas = new FttpWriteAdapter[3];
  81. FttpAdapter faw0 = new FttpAdapter(writepath);
  82. fwas[0]=faw0.getFttpWriter(0,5);
  83. FttpAdapter faw1 = new FttpAdapter(writepath);
  84. fwas[1]=faw1.getFttpWriter(5,5);
  85. FttpAdapter faw2 = new FttpAdapter(writepath);
  86. fwas[2]=faw2.getFttpWriter(10,5);
  87.  
  88. Result<Integer>[] rs = this.readWriteBatch(fras,fwas);
  89.  
  90. System.out.println(rs[0].getResult());
  91. System.out.println(rs[1].getResult());
  92. System.out.println(rs[2].getResult());
  93.  
  94. fa0.close();
  95. fa1.close();
  96. fa2.close();
  97. faw0.close();
  98. faw1.close();
  99. faw2.close();
  100. }catch(FttpException fe){
  101. fe.printStackTrace();
  102. }
  103. }
  104.  
  105. public Result[] undo(Result[] rtarr){
  106. System.out.println("undo.........");
  107. for(int i=0;i<rtarr.length;i++){
  108. if(rtarr[i].getStatus()==Result.EXCEPTION)
  109. System.out.println("Result index"+i+" Error");
  110. }
  111. return rtarr;
  112. }
  113.  
  114. public static void main(String[] args){
  115. FttpBatchWriteReadDemo fwrd = new FttpBatchWriteReadDemo();
  116. fwrd.fttpBatchWrite();
  117. fwrd.fttpBatchRead();
  118. fwrd.fttpBatchReadWrite();
  119. }
  120. }