48. EPMO Open Source Coordination Office Redaction File Detail Report

Produced by Araxis Merge on 2/7/2017 12:14:07 PM Eastern Standard Time. See www.araxis.com for information about Merge. This report uses XHTML and CSS2, and is best viewed with a modern standards-compliant browser. For optimum results when printing this report, use landscape orientation and enable printing of background images and colours in your browser.

48.1 Files compared

# Location File Last Modified
1 cpss.zip\cpss\src\main\java\gov\va\cpss\job CbssSftpItemReader.java Wed Feb 1 21:07:26 2017 UTC
2 cpss.zip\cpss\src\main\java\gov\va\cpss\job CbssSftpItemReader.java Fri Feb 3 20:51:34 2017 UTC

48.2 Comparison summary

Description Between
Files 1 and 2
Text Blocks Lines
Unchanged 2 672
Changed 1 2
Inserted 0 0
Removed 0 0

48.3 Comparison options

Whitespace
Character case Differences in character case are significant
Line endings Differences in line endings (CR and LF characters) are ignored
CR/LF characters Not shown in the comparison detail

48.4 Active regular expressions

No regular expressions were active.

48.5 Comparison detail

  1   package go v.va.cpss. job;
  2  
  3   import sta tic gov.va .cpss.job. CbssJobPro cessingCon stants.FTP _OPEN_ERRO R_STATUS;
  4   import sta tic gov.va .cpss.job. CbssJobPro cessingCon stants.JOB _FAILURE_K EY;
  5   import sta tic gov.va .cpss.job. CbssJobPro cessingCon stants.JOB _FAILURE_M ESSAGE_KEY ;
  6   import sta tic gov.va .cpss.job. CbssJobPro cessingCon stants.REA D_FAILURE_ STATUS;
  7  
  8   import jav a.io.Input Stream;
  9   import jav a.util.con current.Bl ockingQueu e;
  10   import jav a.util.con current.Li nkedBlocki ngQueue;
  11   import jav a.util.con current.Ti meoutExcep tion;
  12  
  13   import org .apache.lo g4j.Logger ;
  14   import org .springfra mework.bat ch.core.Ex itStatus;
  15   import org .springfra mework.bat ch.core.Jo bExecution ;
  16   import org .springfra mework.bat ch.core.St epExecutio n;
  17   import org .springfra mework.bat ch.core.St epExecutio nListener;
  18   import org .springfra mework.bat ch.item.Pa rseExcepti on;
  19   import org .springfra mework.bat ch.item.Un expectedIn putExcepti on;
  20   import org .springfra mework.bat ch.item.fi le.FlatFil eParseExce ption;
  21   import org .springfra mework.bat ch.item.fi le.LineMap per;
  22   import org .springfra mework.bat ch.item.fi le.Resourc eAwareItem ReaderItem Stream;
  23   import org .springfra mework.bat ch.item.su pport.Abst ractItemCo untingItem StreamItem Reader;
  24   import org .springfra mework.bea ns.factory .Initializ ingBean;
  25   import org .springfra mework.cor e.io.Resou rce;
  26   import org .springfra mework.uti l.Assert;
  27   import org .springfra mework.uti l.ClassUti ls;
  28  
  29   import gov .va.cpss.s ervice.Sft pService;
  30   import gov .va.cpss.s ervice.Sft pStreamSes sion;
  31  
  32   /**
  33    * Impleme ntation of  ItemCount ingItemStr eamItemRea der used b y a batch  job to
  34    * read da ta over an  sftp file  stream.
  35    * 
  36    * @author   D N   
  37    */
  38   public abs tract clas s CbssSftp ItemReader <T> extend s Abstract ItemCounti ngItemStre amItemRead er<T>
  39                    impl ements Res ourceAware ItemReader ItemStream <T>, Initi alizingBea n, StepExe cutionList ener {
  40  
  41           pr ivate fina l int DEFA ULT_READ_A TTEMPT_COU NT = 30;
  42  
  43           pr otected st atic final  Logger re aderLogger  = Logger. getLogger( CbssSftpIt emReader.c lass.getCa nonicalNam e());
  44  
  45           pr otected bo olean open ed = false ;
  46  
  47           pr ivate Sftp Service sf tpService;
  48  
  49           pr otected Jo bExecution  jobExecut ion;
  50  
  51           pr ivate int  lineCount  = 0;
  52  
  53           pr otected Re source res ource;
  54  
  55           pr ivate Stri ng directo ry;
  56  
  57           pr ivate Line Mapper<T>  lineMapper ;
  58  
  59           pr otected Bl ockingQueu e<String>  dataQueue  = new Link edBlocking Queue<>();
  60           
  61           pr ivate Sftp StreamSess ion sftpSt reamSessio n;
  62  
  63           pr otected Cb ssStreamTo QueueThrea d queueBui lderThread ;
  64  
  65           pu blic CbssS ftpItemRea der() {
  66                    setN ame(ClassU tils.getSh ortName(Cb ssSftpItem Reader.cla ss));
  67           }
  68  
  69           pu blic SftpS ervice get SftpServic e() {
  70                    retu rn sftpSer vice;
  71           }
  72  
  73           pu blic void  setSftpSer vice(SftpS ervice sft pService)  {
  74                    this .sftpServi ce = sftpS ervice;
  75           }
  76  
  77           pu blic LineM apper<T> g etLineMapp er() {
  78                    retu rn lineMap per;
  79           }
  80  
  81           pu blic void  setLineMap per(LineMa pper<T> li neMapper)  {
  82                    this .lineMappe r = lineMa pper;
  83           }
  84  
  85           pu blic Strin g getDirec tory() {
  86                    retu rn directo ry;
  87           }
  88  
  89           pu blic void  setDirecto ry(String  directory)  {
  90                    this .directory  = directo ry;
  91           }
  92  
  93           @O verride
  94           pr otected vo id doOpen( ) throws E xception {
  95  
  96                    open ed = false ;
  97  
  98                    try  {
  99  
  100                             if (re source.get Filename()  != null)  {
  101  
  102                                      // Get t he size of  the file.
  103                                      final Lo ng expecte dByteCount  = sftpSer vice.ftpGe tFileSizeI nDirectory (resource. getFilenam e(), direc tory);
  104  
  105                                      if (expe ctedByteCo unt != nul l) {
  106  
  107                                               readerLogg er.info("A ttempting  to read fi le from ft p server:  " + resour ce.getFile name());
  108  
  109                                               // Open th e file dow nload stre am session .
  110                                               sftpStream Session =  sftpServic e.openFile Stream(res ource.getF ilename(),  directory );
  111  
  112                                               if (sftpSt reamSessio n != null)  {
  113  
  114                                                       //  Start thr ead to pop ulate queu e.
  115                                                       qu eueBuilder Thread = g etQueueBui lderThread (expectedB yteCount,  sftpStream Session.ge tInputStre am(), data Queue);
  116                                                       if  (queueBui lderThread  != null)  {
  117                                                                queu eBuilderTh read.start ();
  118                                                                open ed = true;
  119                                                       }  else {
  120                                                                setF ailureStat usAndMessa ge(FTP_OPE N_ERROR_ST ATUS,
  121                                                                                  "Unable  to open ft p connecti on because  failed to  obtain da ta stream  processing  thread");
  122                                                       }
  123  
  124                                               } else {
  125                                                       se tFailureSt atusAndMes sage(FTP_O PEN_ERROR_ STATUS,
  126                                                                         "Unabl e to open  ftp connec tion becau se failed  to obtain  data strea m");
  127                                               }
  128  
  129                                      } else {
  130                                               setFailure StatusAndM essage(FTP _OPEN_ERRO R_STATUS,
  131                                                                "Una ble to ope n ftp conn ection bec ause faile d file siz e request" );
  132                                      }
  133                             }
  134  
  135                    } ca tch (Excep tion e) {
  136                             String Builder er ror = new  StringBuil der();
  137                             error. append("Un able to op en ftp con nection fo r read bec ause of ") ;
  138                             error. append(e.g etClass(). getSimpleN ame());
  139                             error. append("\n Message: " );
  140                             error. append(e.g etMessage( ));
  141  
  142                             setFai lureStatus AndMessage (FTP_OPEN_ ERROR_STAT US, error. toString() );
  143                    }
  144           }
  145  
  146           @O verride
  147           pu blic T rea d() throws  Exception , Unexpect edInputExc eption, Pa rseExcepti on {
  148                    try  {
  149  
  150                             return  super.rea d();
  151  
  152                    } ca tch (Excep tion e) {
  153  
  154                             String Builder er ror = new  StringBuil der();
  155                             error. append("Un able to re ad item be cause of " );
  156                             error. append(e.g etClass(). getSimpleN ame());
  157                             error. append("\n Message: " );
  158                             error. append(e.g etMessage( ));
  159                             if ((e .getCause( ) != null)  && (e.get Cause().ge tMessage()  != null))  {
  160                                      error.ap pend("\nCa use: ");
  161                                      error.ap pend(e.get Cause().ge tMessage() .trim());
  162                             }
  163  
  164                             setFai lureStatus AndMessage (READ_FAIL URE_STATUS , error.to String());
  165                    }
  166                    // N ull respon se will ca use step t o end.
  167                    retu rn null;
  168           }
  169  
  170           /* *
  171            *  Subclass  must imple ment metho d for the  appropriat e stream t o queue
  172            *  builder t hread.
  173            *  
  174            *  @return T he queue b uilder thr ead.
  175            * /
  176           pr otected ab stract Cbs sStreamToQ ueueThread  getQueueB uilderThre ad(long ex pectedByte Count, Inp utStream i n,
  177                             Blocki ngQueue<St ring> outp utQueue);
  178  
  179           @O verride
  180           pu blic void  beforeStep (StepExecu tion stepE xecution)  {
  181                    // S ave the jo b executio n at the b eginning o f the step .
  182                    // T he executi on context  will be u sed to set  exit stat us if a fa ilure
  183                    // d uring read  processin g.
  184                    jobE xecution =  stepExecu tion.getJo bExecution ();
  185           }
  186  
  187           @O verride
  188           pu blic ExitS tatus afte rStep(Step Execution  stepExecut ion) {
  189                    // D o not do a nything sp ecial here .
  190                    retu rn null;
  191           }
  192  
  193           /* *
  194            *  Set the f ailure and  message i n the job  execution  context.
  195            * /
  196           pr otected vo id setFail ureStatusA ndMessage( final Stri ng status,  final Str ing messag e) {
  197                    // S et job fai lure.
  198                    setF ailureStat us(status) ;
  199  
  200                    // S et job fai lure messa ge.
  201                    setF ailureMess age(messag e);
  202           }
  203  
  204           /* *
  205            *  Set the f ailure in  the job ex ecution co ntext.
  206            * /
  207           pr ivate void  setFailur eStatus(fi nal String  status) {
  208                    // L og job fai lure statu s.
  209                    read erLogger.e rror("Read  failed wi th status:  " + statu s);
  210  
  211                    // S et job fai lure.
  212                    jobE xecution.g etExecutio nContext() .putString (JOB_FAILU RE_KEY, st atus);
  213           }
  214  
  215           /* *
  216            *  Set the f ailure mes sage in th e job exec ution cont ext.
  217            * /
  218           pr ivate void  setFailur eMessage(f inal Strin g message)  {
  219                    // L og job fai lure messa ge.
  220                    read erLogger.e rror("Read  failure m essage: "  + message) ;
  221  
  222                    // S et job fai lure messa ge.
  223                    jobE xecution.g etExecutio nContext() .putString (JOB_FAILU RE_MESSAGE _KEY, mess age);
  224           }
  225  
  226           @O verride
  227           pr otected T  doRead() t hrows Exce ption {
  228  
  229                    // I f not open ed then re turn null  which will  effective ly stop
  230                    // p rocessing.
  231                    if ( !opened) {
  232                             return  null;
  233                    }
  234  
  235                    // L ocal flag  to retry r ead until  failure or  get line.
  236                    bool ean proces singStream  = true;
  237  
  238                    // L ocal count er to retr y so many  times befo re declari ng a read  failure.
  239                    int  readAttemp tCounter =  0;
  240                    do {
  241  
  242                             try {
  243                                      // Incre ment the a ttempt cou nter to ma ke sure we  do not ge t
  244                                      // stuck  infinite  loop.
  245                                      ++readAt temptCount er;
  246  
  247                                      // Obtai n the next  message.
  248                                      String l ine = read Line();
  249  
  250                                      if (line  == null)  {
  251  
  252                                               // If thre ad is not  alive or q ueue is em pty then q uit
  253                                               // process ing.
  254                                               // Otherwi se, will a ttempt to  read again .
  255                                               if (!queue BuilderThr ead.isAliv e() && dat aQueue.isE mpty()) {
  256                                                       pr ocessingSt ream = fal se;
  257                                               } else {
  258                                                       re aderLogger .info("Wai ting for q ueue...");
  259                                                       Th read.sleep (1000);
  260                                               }
  261  
  262                                      } else {
  263  
  264                                               try {
  265                                                       //  Convert t he line to  a mapped  object.
  266                                                       re turn lineM apper.mapL ine(line,  lineCount) ;
  267                                               } catch (E xception e x) {
  268                                                       th row new Fl atFilePars eException ("Parsing  error at l ine: " + l ineCount +  " in reso urce=["
  269                                                                         + reso urce.getDe scription( ) + "], in put=[" + l ine + "]",  ex, line,  lineCount );
  270                                               }
  271                                      }
  272  
  273                             } catc h (Interru ptedExcept ion e) {
  274                                      readerLo gger.error ("Queueing  thread no t dead and  will atte mpt retry:  " + e.get Message()) ;
  275                             }
  276  
  277                             // If  attempted  too many r ead attemp ts then de clare fail ure.
  278                             if (re adAttemptC ounter > D EFAULT_REA D_ATTEMPT_ COUNT) {
  279                                      final St ring error  = "Failed  to read f rom data q ueue withi n allocate d number o f retries  ("
  280                                                       +  DEFAULT_RE AD_ATTEMPT _COUNT + " )";
  281                                      readerLo gger.error (error);
  282                                      processi ngStream =  false;
  283                                      throw ne w TimeoutE xception(e rror);
  284                             }
  285  
  286                    } wh ile (proce ssingStrea m);
  287  
  288                    retu rn null;
  289           }
  290  
  291           /* *
  292            *  Read the  line from  the data q ueue. If a n error is  encounter ed it shou ld
  293            *  ripple up  and be ca ught by re ad method.
  294            *  
  295            *  @return T he next ro w of the f ile.
  296            *  @throws E xception
  297            * /
  298           pr otected St ring readL ine() thro ws Excepti on {
  299  
  300                    if ( !dataQueue .isEmpty() ) {
  301                             lineCo unt++;
  302                             return  dataQueue .take();
  303                    }
  304  
  305                    retu rn null;
  306           }
  307  
  308           @O verride
  309           pu blic void  afterPrope rtiesSet()  throws Ex ception {
  310                    Asse rt.notNull (lineMappe r, "LineMa pper is re quired");
  311           }
  312  
  313           @O verride
  314           pu blic void  setResourc e(Resource  resource)  {
  315                    this .resource  = resource ;
  316           }
  317  
  318           @O verride
  319           pr otected vo id doClose () throws  Exception  {
  320                    
  321                    // B e sure to  reset the  line count .
  322                    line Count = 0;
  323  
  324                    // S top the qu eue reader  thread fr om process ing.
  325                    // W ait up to  10 seconds  for the t hread to s top proces sing.
  326                    if ( queueBuild erThread ! = null) {
  327                             queueB uilderThre ad.interru pt();
  328                             queueB uilderThre ad.join(10 000);
  329                    }
  330                    
  331                    // C lose the s ession and  the input  stream.
  332                    if ( sftpStream Session !=  null) {
  333                             sftpSt reamSessio n.close();
  334                    }
  335           }
  336  
  337   }