6.9 批量并行读写远程文件和事务补偿处理
FileBatch类支持批量并行读写操作,包括对FttpAdapter和FileAdapter的支持,它跟并行读写的区别是不需要检查结果,会等到所有并行读写任务全部完成才返回,并在发生异常时提供事务补偿支持。
1.批量并行读
- public Result<byte[]>[] readAllBatch(TryByteReadAdapter[] fras)
实现对多个FttpReadAdapter任务的批量读,输入一个FttpReadAdapter数组,并行进行它们的读取,直到每个FttpReadAdapter读完后,以数组的方式批量输出它们对应的结果,比如:
- FttpReadAdapter[] fras = new FttpReadAdapter[3];
- fras[0]=new FttpAdapter(fttppath).getFttpReader(0,5);
- fras[1]=new FttpAdapter(fttppath).getFttpReader(5,5);
- fras[2]=new FttpAdapter(fttppath).getFttpReader(10,5);
- Result<byte[]>[] rs = new FileBatch().readAllBatch(fras);
上面代码表示并行从3个位置读一个文件内容,等全部读完后,将对应的结果放在一个数组中返回。
2.批量并行写
- FttpWriteAdapter[] fwas = new FttpWriteAdapter[3];
- fwas[0]=new FttpAdapter(fttppath).getFttpWriter(0,5);
- fwas[1]=new FttpAdapter(fttppath).getFttpWriter(5,5);
- fwas[2]=new FttpAdapter(fttppath).getFttpWriter(10,5);
- Result<Integer>[] rs = new FileBatch().writeBatch(fwas, "abcde".getBytes());
上面代码表示并行对一个文件的3个位置写入"abcde"字符,等全部写完后,返回对应的结果数组。
注意
这里与并行读写一样,3个FttpReadAdapter或者FttpWriteAdapter是由3个不同的FttpAdapter生成,而不是同一个生成。
3.批量并行读写
- Result<Integer>[] rs = new FileBatch().readWriteBatch(fras,fwas);
上面的代码表示将前面的批量读和批量写在一个过程中完成,从fras中每个FttpReadAdapter读,然后通过fwas中对应的每FttpWriteAdapter写入,所有读写完成后返回写入结果数组。
4.事务补偿处理
在批量并行读写过程中,如果其中一个FttpReadAdapter或FttpWriteAdapter发生错误,那么框架会进行分布式事务处理,进行两阶段提交,然后调用undo操作进行事务补偿处理,撤销已经产生的改动和影响。
FileBatch类提供了对undo方法的定义:
- public Result[] undo(Result[] rtarr)
rtarr是传入的结果,然后返回执行undo撤销处理后的结果。
比如调用readAllBatch发生错误,FileBatch会将结果传入undo进行撤销操作,然后才返回结果。
因此开发者需要自己实现undo方法的内容,继承FileBatch类覆盖undo方法:
- public Result[] undo(Result[] rtarr){
- for(int i=0;i<rtarr.length;i++){
- if(rtarr[i].getStatus()==Result.EXCEPTION)
- System.out.println("Result index"+i+" Error");
- }
- return rtarr;
- }
上面的undo方法将发生异常的结果的序号输出显示。
所有的批量读写方法都可以以排它的方式进行,只需指定boolean locked参数即可。
另外,除了支持byte批量并行读写外,也支持所有的整型批量并行读写,提供的API和操作几乎类似。
FttpBatchWriteReadDemo演示了一个批量并行读、批量并行写、批量并行读写操作和事务补偿操作。
运行步骤如下:
1)启动ParkServerDemo:
- java -cp fourinone.jar; ParkServerDemo
2)在192.168.0.1机器上启动FttpServer:
- java -cp fourinone.jar; FttpServer 192.168.0.1
3)在本地运行FttpBatchWriteReadDemo:
- java -cp fourinone.jar; FttpMulWriteReadDemo
完整demo源码如下:
- // FttpBatchWriteReadDemo
- import com.fourinone.FttpAdapter;
- import com.fourinone.FttpException;
- import com.fourinone.Result;
- import com.fourinone.FttpAdapter.FttpReadAdapter;
- import com.fourinone.FttpAdapter.FttpWriteAdapter;
- import com.fourinone.FileBatch;
- public class FttpBatchWriteReadDemo extends FileBatch
- {
- public void fttpBatchWrite(){
- try{
- String fttppath = "fttp://192.168.0.1/home/log/1.log";
- FttpWriteAdapter[] fwas = new FttpWriteAdapter[3];
- FttpAdapter fa0 = new FttpAdapter(fttppath);
- fwas[0]=fa0.getFttpWriter(0,5);
- FttpAdapter fa1 = new FttpAdapter(fttppath);
- fwas[1]=fa1.getFttpWriter(5,5);
- FttpAdapter fa2 = new FttpAdapter(fttppath);
- fwas[2]=fa2.getFttpWriter(10,5);
- Result<Integer>[] rs = this.writeBatch(fwas, "abcde".getBytes());
- System.out.println(rs[0].getResult());
- System.out.println(rs[1].getResult());
- System.out.println(rs[2].getResult());
- fa0.close();
- fa1.close();
- fa2.close();
- }catch(FttpException fe){
- fe.printStackTrace();
- }
- }
- public void fttpBatchRead(){
- try{
- String fttppath = "fttp://192.168.0.1/home/log/1.log";
- FttpReadAdapter[] fras = new FttpReadAdapter[3];
- FttpAdapter fa0 = new FttpAdapter(fttppath);
- fras[0]=fa0.getFttpReader(0,5);
- FttpAdapter fa1 = new FttpAdapter(fttppath);
- fras[1]=fa1.getFttpReader(5,5);
- FttpAdapter fa2 = new FttpAdapter(fttppath);
- fras[2]=fa2.getFttpReader(10,5);
- Result<byte[]>[] rs = this.readAllBatch(fras);
- System.out.println(new String(rs[0].getResult()));
- System.out.println(new String(rs[1].getResult()));
- System.out.println(new String(rs[2].getResult()));
- fa0.close();
- fa1.close();
- fa2.close();
- }catch(FttpException fe){
- fe.printStackTrace();
- }
- }
- public void fttpBatchReadWrite(){
- try{
- String readpath = "fttp://192.168.0.1/home/log/1.log";
- FttpReadAdapter[] fras = new FttpReadAdapter[3];
- FttpAdapter fa0 = new FttpAdapter(readpath);
- fras[0]=fa0.getFttpReader(0,5);
- FttpAdapter fa1 = new FttpAdapter(readpath);
- fras[1]=fa1.getFttpReader(5,5);
- FttpAdapter fa2 = new FttpAdapter(readpath);
- fras[2]=fa2.getFttpReader(10,5);
- String writepath = "fttp://192.168.0.1/home/log/2.log";
- FttpWriteAdapter[] fwas = new FttpWriteAdapter[3];
- FttpAdapter faw0 = new FttpAdapter(writepath);
- fwas[0]=faw0.getFttpWriter(0,5);
- FttpAdapter faw1 = new FttpAdapter(writepath);
- fwas[1]=faw1.getFttpWriter(5,5);
- FttpAdapter faw2 = new FttpAdapter(writepath);
- fwas[2]=faw2.getFttpWriter(10,5);
- Result<Integer>[] rs = this.readWriteBatch(fras,fwas);
- System.out.println(rs[0].getResult());
- System.out.println(rs[1].getResult());
- System.out.println(rs[2].getResult());
- fa0.close();
- fa1.close();
- fa2.close();
- faw0.close();
- faw1.close();
- faw2.close();
- }catch(FttpException fe){
- fe.printStackTrace();
- }
- }
- public Result[] undo(Result[] rtarr){
- System.out.println("undo.........");
- for(int i=0;i<rtarr.length;i++){
- if(rtarr[i].getStatus()==Result.EXCEPTION)
- System.out.println("Result index"+i+" Error");
- }
- return rtarr;
- }
- public static void main(String[] args){
- FttpBatchWriteReadDemo fwrd = new FttpBatchWriteReadDemo();
- fwrd.fttpBatchWrite();
- fwrd.fttpBatchRead();
- fwrd.fttpBatchReadWrite();
- }
- }
