Produced by Araxis Merge on 11/14/2017 6:57:20 AM Central Standard Time. See www.araxis.com for information about Merge. This report uses XHTML and CSS2, and is best viewed with a modern standards-compliant browser. For optimum results when printing this report, use landscape orientation and enable printing of background images and colours in your browser.
| # | Location | File | Last Modified |
|---|---|---|---|
| 1 | cbs.zip\cbs\cbs\src\main\java\gov\va\cpss\job | CbssSftpItemReader.java | Thu Nov 9 14:19:44 2017 UTC |
| 2 | cbs.zip\cbs\cbs\src\main\java\gov\va\cpss\job | CbssSftpItemReader.java | Tue Nov 14 12:25:58 2017 UTC |
| Description | Between Files 1 and 2 |
|
|---|---|---|
| Text Blocks | Lines | |
| Unchanged | 2 | 672 |
| Changed | 1 | 2 |
| Inserted | 0 | 0 |
| Removed | 0 | 0 |
| Whitespace | |
|---|---|
| Character case | Differences in character case are significant |
| Line endings | Differences in line endings (CR and LF characters) are ignored |
| CR/LF characters | Not shown in the comparison detail |
No regular expressions were active.
| 1 | package go v.va.cpss. job; | |
| 2 | ||
| 3 | import sta tic gov.va .cpss.job. CbssJobPro cessingCon stants.FTP _OPEN_ERRO R_STATUS; | |
| 4 | import sta tic gov.va .cpss.job. CbssJobPro cessingCon stants.JOB _FAILURE_K EY; | |
| 5 | import sta tic gov.va .cpss.job. CbssJobPro cessingCon stants.JOB _FAILURE_M ESSAGE_KEY ; | |
| 6 | import sta tic gov.va .cpss.job. CbssJobPro cessingCon stants.REA D_FAILURE_ STATUS; | |
| 7 | ||
| 8 | import jav a.io.Input Stream; | |
| 9 | import jav a.util.con current.Bl ockingQueu e; | |
| 10 | import jav a.util.con current.Li nkedBlocki ngQueue; | |
| 11 | import jav a.util.con current.Ti meoutExcep tion; | |
| 12 | ||
| 13 | import org .apache.lo g4j.Logger ; | |
| 14 | import org .springfra mework.bat ch.core.Ex itStatus; | |
| 15 | import org .springfra mework.bat ch.core.Jo bExecution ; | |
| 16 | import org .springfra mework.bat ch.core.St epExecutio n; | |
| 17 | import org .springfra mework.bat ch.core.St epExecutio nListener; | |
| 18 | import org .springfra mework.bat ch.item.Pa rseExcepti on; | |
| 19 | import org .springfra mework.bat ch.item.Un expectedIn putExcepti on; | |
| 20 | import org .springfra mework.bat ch.item.fi le.FlatFil eParseExce ption; | |
| 21 | import org .springfra mework.bat ch.item.fi le.LineMap per; | |
| 22 | import org .springfra mework.bat ch.item.fi le.Resourc eAwareItem ReaderItem Stream; | |
| 23 | import org .springfra mework.bat ch.item.su pport.Abst ractItemCo untingItem StreamItem Reader; | |
| 24 | import org .springfra mework.bea ns.factory .Initializ ingBean; | |
| 25 | import org .springfra mework.cor e.io.Resou rce; | |
| 26 | import org .springfra mework.uti l.Assert; | |
| 27 | import org .springfra mework.uti l.ClassUti ls; | |
| 28 | ||
| 29 | import gov .va.cpss.s ervice.Sft pService; | |
| 30 | import gov .va.cpss.s ervice.Sft pStreamSes sion; | |
| 31 | ||
| 32 | /** | |
| 33 | * Impleme ntation of ItemCount ingItemStr eamItemRea der used b y a batch job to | |
| 34 | * read da ta over an sftp file stream. | |
| 35 | * | |
| 36 | * @author DN S BROWNL | |
| 37 | */ | |
| 38 | public abs tract clas s CbssSftp ItemReader <T> extend s Abstract ItemCounti ngItemStre amItemRead er<T> | |
| 39 | impl ements Res ourceAware ItemReader ItemStream <T>, Initi alizingBea n, StepExe cutionList ener { | |
| 40 | ||
| 41 | pr ivate fina l int DEFA ULT_READ_A TTEMPT_COU NT = 30; | |
| 42 | ||
| 43 | pr otected st atic final Logger re aderLogger = Logger. getLogger( CbssSftpIt emReader.c lass.getCa nonicalNam e()); | |
| 44 | ||
| 45 | pr otected bo olean open ed = false ; | |
| 46 | ||
| 47 | pr ivate Sftp Service sf tpService; | |
| 48 | ||
| 49 | pr otected Jo bExecution jobExecut ion; | |
| 50 | ||
| 51 | pr ivate int lineCount = 0; | |
| 52 | ||
| 53 | pr otected Re source res ource; | |
| 54 | ||
| 55 | pr ivate Stri ng directo ry; | |
| 56 | ||
| 57 | pr ivate Line Mapper<T> lineMapper ; | |
| 58 | ||
| 59 | pr otected Bl ockingQueu e<String> dataQueue = new Link edBlocking Queue<>(); | |
| 60 | ||
| 61 | pr ivate Sftp StreamSess ion sftpSt reamSessio n; | |
| 62 | ||
| 63 | pr otected Cb ssStreamTo QueueThrea d queueBui lderThread ; | |
| 64 | ||
| 65 | pu blic CbssS ftpItemRea der() { | |
| 66 | setN ame(ClassU tils.getSh ortName(Cb ssSftpItem Reader.cla ss)); | |
| 67 | } | |
| 68 | ||
| 69 | pu blic SftpS ervice get SftpServic e() { | |
| 70 | retu rn sftpSer vice; | |
| 71 | } | |
| 72 | ||
| 73 | pu blic void setSftpSer vice(SftpS ervice sft pService) { | |
| 74 | this .sftpServi ce = sftpS ervice; | |
| 75 | } | |
| 76 | ||
| 77 | pu blic LineM apper<T> g etLineMapp er() { | |
| 78 | retu rn lineMap per; | |
| 79 | } | |
| 80 | ||
| 81 | pu blic void setLineMap per(LineMa pper<T> li neMapper) { | |
| 82 | this .lineMappe r = lineMa pper; | |
| 83 | } | |
| 84 | ||
| 85 | pu blic Strin g getDirec tory() { | |
| 86 | retu rn directo ry; | |
| 87 | } | |
| 88 | ||
| 89 | pu blic void setDirecto ry(String directory) { | |
| 90 | this .directory = directo ry; | |
| 91 | } | |
| 92 | ||
| 93 | @O verride | |
| 94 | pr otected vo id doOpen( ) throws E xception { | |
| 95 | ||
| 96 | open ed = false ; | |
| 97 | ||
| 98 | try { | |
| 99 | ||
| 100 | if (re source.get Filename() != null) { | |
| 101 | ||
| 102 | // Get t he size of the file. | |
| 103 | final Lo ng expecte dByteCount = sftpSer vice.ftpGe tFileSizeI nDirectory (resource. getFilenam e(), direc tory); | |
| 104 | ||
| 105 | if (expe ctedByteCo unt != nul l) { | |
| 106 | ||
| 107 | readerLogg er.info("A ttempting to read fi le from ft p server: " + resour ce.getFile name()); | |
| 108 | ||
| 109 | // Open th e file dow nload stre am session . | |
| 110 | sftpStream Session = sftpServic e.openFile Stream(res ource.getF ilename(), directory ); | |
| 111 | ||
| 112 | if (sftpSt reamSessio n != null) { | |
| 113 | ||
| 114 | // Start thr ead to pop ulate queu e. | |
| 115 | qu eueBuilder Thread = g etQueueBui lderThread (expectedB yteCount, sftpStream Session.ge tInputStre am(), data Queue); | |
| 116 | if (queueBui lderThread != null) { | |
| 117 | queu eBuilderTh read.start (); | |
| 118 | open ed = true; | |
| 119 | } else { | |
| 120 | setF ailureStat usAndMessa ge(FTP_OPE N_ERROR_ST ATUS, | |
| 121 | "Unable to open ft p connecti on because failed to obtain da ta stream processing thread"); | |
| 122 | } | |
| 123 | ||
| 124 | } else { | |
| 125 | se tFailureSt atusAndMes sage(FTP_O PEN_ERROR_ STATUS, | |
| 126 | "Unabl e to open ftp connec tion becau se failed to obtain data strea m"); | |
| 127 | } | |
| 128 | ||
| 129 | } else { | |
| 130 | setFailure StatusAndM essage(FTP _OPEN_ERRO R_STATUS, | |
| 131 | "Una ble to ope n ftp conn ection bec ause faile d file siz e request" ); | |
| 132 | } | |
| 133 | } | |
| 134 | ||
| 135 | } ca tch (Excep tion e) { | |
| 136 | String Builder er ror = new StringBuil der(); | |
| 137 | error. append("Un able to op en ftp con nection fo r read bec ause of ") ; | |
| 138 | error. append(e.g etClass(). getSimpleN ame()); | |
| 139 | error. append("\n Message: " ); | |
| 140 | error. append(e.g etMessage( )); | |
| 141 | ||
| 142 | setFai lureStatus AndMessage (FTP_OPEN_ ERROR_STAT US, error. toString() ); | |
| 143 | } | |
| 144 | } | |
| 145 | ||
| 146 | @O verride | |
| 147 | pu blic T rea d() throws Exception , Unexpect edInputExc eption, Pa rseExcepti on { | |
| 148 | try { | |
| 149 | ||
| 150 | return super.rea d(); | |
| 151 | ||
| 152 | } ca tch (Excep tion e) { | |
| 153 | ||
| 154 | String Builder er ror = new StringBuil der(); | |
| 155 | error. append("Un able to re ad item be cause of " ); | |
| 156 | error. append(e.g etClass(). getSimpleN ame()); | |
| 157 | error. append("\n Message: " ); | |
| 158 | error. append(e.g etMessage( )); | |
| 159 | if ((e .getCause( ) != null) && (e.get Cause().ge tMessage() != null)) { | |
| 160 | error.ap pend("\nCa use: "); | |
| 161 | error.ap pend(e.get Cause().ge tMessage() .trim()); | |
| 162 | } | |
| 163 | ||
| 164 | setFai lureStatus AndMessage (READ_FAIL URE_STATUS , error.to String()); | |
| 165 | } | |
| 166 | // N ull respon se will ca use step t o end. | |
| 167 | retu rn null; | |
| 168 | } | |
| 169 | ||
| 170 | /* * | |
| 171 | * Subclass must imple ment metho d for the appropriat e stream t o queue | |
| 172 | * builder t hread. | |
| 173 | * | |
| 174 | * @return T he queue b uilder thr ead. | |
| 175 | * / | |
| 176 | pr otected ab stract Cbs sStreamToQ ueueThread getQueueB uilderThre ad(long ex pectedByte Count, Inp utStream i n, | |
| 177 | Blocki ngQueue<St ring> outp utQueue); | |
| 178 | ||
| 179 | @O verride | |
| 180 | pu blic void beforeStep (StepExecu tion stepE xecution) { | |
| 181 | // S ave the jo b executio n at the b eginning o f the step . | |
| 182 | // T he executi on context will be u sed to set exit stat us if a fa ilure | |
| 183 | // d uring read processin g. | |
| 184 | jobE xecution = stepExecu tion.getJo bExecution (); | |
| 185 | } | |
| 186 | ||
| 187 | @O verride | |
| 188 | pu blic ExitS tatus afte rStep(Step Execution stepExecut ion) { | |
| 189 | // D o not do a nything sp ecial here . | |
| 190 | retu rn null; | |
| 191 | } | |
| 192 | ||
| 193 | /* * | |
| 194 | * Set the f ailure and message i n the job execution context. | |
| 195 | * / | |
| 196 | pr otected vo id setFail ureStatusA ndMessage( final Stri ng status, final Str ing messag e) { | |
| 197 | // S et job fai lure. | |
| 198 | setF ailureStat us(status) ; | |
| 199 | ||
| 200 | // S et job fai lure messa ge. | |
| 201 | setF ailureMess age(messag e); | |
| 202 | } | |
| 203 | ||
| 204 | /* * | |
| 205 | * Set the f ailure in the job ex ecution co ntext. | |
| 206 | * / | |
| 207 | pr ivate void setFailur eStatus(fi nal String status) { | |
| 208 | // L og job fai lure statu s. | |
| 209 | read erLogger.e rror("Read failed wi th status: " + statu s); | |
| 210 | ||
| 211 | // S et job fai lure. | |
| 212 | jobE xecution.g etExecutio nContext() .putString (JOB_FAILU RE_KEY, st atus); | |
| 213 | } | |
| 214 | ||
| 215 | /* * | |
| 216 | * Set the f ailure mes sage in th e job exec ution cont ext. | |
| 217 | * / | |
| 218 | pr ivate void setFailur eMessage(f inal Strin g message) { | |
| 219 | // L og job fai lure messa ge. | |
| 220 | read erLogger.e rror("Read failure m essage: " + message) ; | |
| 221 | ||
| 222 | // S et job fai lure messa ge. | |
| 223 | jobE xecution.g etExecutio nContext() .putString (JOB_FAILU RE_MESSAGE _KEY, mess age); | |
| 224 | } | |
| 225 | ||
| 226 | @O verride | |
| 227 | pr otected T doRead() t hrows Exce ption { | |
| 228 | ||
| 229 | // I f not open ed then re turn null which will effective ly stop | |
| 230 | // p rocessing. | |
| 231 | if ( !opened) { | |
| 232 | return null; | |
| 233 | } | |
| 234 | ||
| 235 | // L ocal flag to retry r ead until failure or get line. | |
| 236 | bool ean proces singStream = true; | |
| 237 | ||
| 238 | // L ocal count er to retr y so many times befo re declari ng a read failure. | |
| 239 | int readAttemp tCounter = 0; | |
| 240 | do { | |
| 241 | ||
| 242 | try { | |
| 243 | // Incre ment the a ttempt cou nter to ma ke sure we do not ge t | |
| 244 | // stuck infinite loop. | |
| 245 | ++readAt temptCount er; | |
| 246 | ||
| 247 | // Obtai n the next message. | |
| 248 | String l ine = read Line(); | |
| 249 | ||
| 250 | if (line == null) { | |
| 251 | ||
| 252 | // If thre ad is not alive or q ueue is em pty then q uit | |
| 253 | // process ing. | |
| 254 | // Otherwi se, will a ttempt to read again . | |
| 255 | if (!queue BuilderThr ead.isAliv e() && dat aQueue.isE mpty()) { | |
| 256 | pr ocessingSt ream = fal se; | |
| 257 | } else { | |
| 258 | re aderLogger .info("Wai ting for q ueue..."); | |
| 259 | Th read.sleep (1000); | |
| 260 | } | |
| 261 | ||
| 262 | } else { | |
| 263 | ||
| 264 | try { | |
| 265 | // Convert t he line to a mapped object. | |
| 266 | re turn lineM apper.mapL ine(line, lineCount) ; | |
| 267 | } catch (E xception e x) { | |
| 268 | th row new Fl atFilePars eException ("Parsing error at l ine: " + l ineCount + " in reso urce=[" | |
| 269 | + reso urce.getDe scription( ) + "], in put=[" + l ine + "]", ex, line, lineCount ); | |
| 270 | } | |
| 271 | } | |
| 272 | ||
| 273 | } catc h (Interru ptedExcept ion e) { | |
| 274 | readerLo gger.error ("Queueing thread no t dead and will atte mpt retry: " + e.get Message()) ; | |
| 275 | } | |
| 276 | ||
| 277 | // If attempted too many r ead attemp ts then de clare fail ure. | |
| 278 | if (re adAttemptC ounter > D EFAULT_REA D_ATTEMPT_ COUNT) { | |
| 279 | final St ring error = "Failed to read f rom data q ueue withi n allocate d number o f retries (" | |
| 280 | + DEFAULT_RE AD_ATTEMPT _COUNT + " )"; | |
| 281 | readerLo gger.error (error); | |
| 282 | processi ngStream = false; | |
| 283 | throw ne w TimeoutE xception(e rror); | |
| 284 | } | |
| 285 | ||
| 286 | } wh ile (proce ssingStrea m); | |
| 287 | ||
| 288 | retu rn null; | |
| 289 | } | |
| 290 | ||
| 291 | /* * | |
| 292 | * Read the line from the data q ueue. If a n error is encounter ed it shou ld | |
| 293 | * ripple up and be ca ught by re ad method. | |
| 294 | * | |
| 295 | * @return T he next ro w of the f ile. | |
| 296 | * @throws E xception | |
| 297 | * / | |
| 298 | pr otected St ring readL ine() thro ws Excepti on { | |
| 299 | ||
| 300 | if ( !dataQueue .isEmpty() ) { | |
| 301 | lineCo unt++; | |
| 302 | return dataQueue .take(); | |
| 303 | } | |
| 304 | ||
| 305 | retu rn null; | |
| 306 | } | |
| 307 | ||
| 308 | @O verride | |
| 309 | pu blic void afterPrope rtiesSet() throws Ex ception { | |
| 310 | Asse rt.notNull (lineMappe r, "LineMa pper is re quired"); | |
| 311 | } | |
| 312 | ||
| 313 | @O verride | |
| 314 | pu blic void setResourc e(Resource resource) { | |
| 315 | this .resource = resource ; | |
| 316 | } | |
| 317 | ||
| 318 | @O verride | |
| 319 | pr otected vo id doClose () throws Exception { | |
| 320 | ||
| 321 | // B e sure to reset the line count . | |
| 322 | line Count = 0; | |
| 323 | ||
| 324 | // S top the qu eue reader thread fr om process ing. | |
| 325 | // W ait up to 10 seconds for the t hread to s top proces sing. | |
| 326 | if ( queueBuild erThread ! = null) { | |
| 327 | queueB uilderThre ad.interru pt(); | |
| 328 | queueB uilderThre ad.join(10 000); | |
| 329 | } | |
| 330 | ||
| 331 | // C lose the s ession and the input stream. | |
| 332 | if ( sftpStream Session != null) { | |
| 333 | sftpSt reamSessio n.close(); | |
| 334 | } | |
| 335 | } | |
| 336 | ||
| 337 | } |
Araxis Merge (but not the data content of this report) is Copyright © 1993-2016 Araxis Ltd (www.araxis.com). All rights reserved.