Produced by Araxis Merge on 2/7/2017 12:14:07 PM Eastern Standard Time. See www.araxis.com for information about Merge. This report uses XHTML and CSS2, and is best viewed with a modern standards-compliant browser. For optimum results when printing this report, use landscape orientation and enable printing of background images and colours in your browser.
# | Location | File | Last Modified |
---|---|---|---|
1 | cpss.zip\cpss\src\main\java\gov\va\cpss\job | CbssSftpItemReader.java | Wed Feb 1 21:07:26 2017 UTC |
2 | cpss.zip\cpss\src\main\java\gov\va\cpss\job | CbssSftpItemReader.java | Fri Feb 3 20:51:34 2017 UTC |
Description | Between Files 1 and 2 |
|
---|---|---|
Text Blocks | Lines | |
Unchanged | 2 | 672 |
Changed | 1 | 2 |
Inserted | 0 | 0 |
Removed | 0 | 0 |
Whitespace | |
---|---|
Character case | Differences in character case are significant |
Line endings | Differences in line endings (CR and LF characters) are ignored |
CR/LF characters | Not shown in the comparison detail |
No regular expressions were active.
1 | package go v.va.cpss. job; | |
2 | ||
3 | import sta tic gov.va .cpss.job. CbssJobPro cessingCon stants.FTP _OPEN_ERRO R_STATUS; | |
4 | import sta tic gov.va .cpss.job. CbssJobPro cessingCon stants.JOB _FAILURE_K EY; | |
5 | import sta tic gov.va .cpss.job. CbssJobPro cessingCon stants.JOB _FAILURE_M ESSAGE_KEY ; | |
6 | import sta tic gov.va .cpss.job. CbssJobPro cessingCon stants.REA D_FAILURE_ STATUS; | |
7 | ||
8 | import jav a.io.Input Stream; | |
9 | import jav a.util.con current.Bl ockingQueu e; | |
10 | import jav a.util.con current.Li nkedBlocki ngQueue; | |
11 | import jav a.util.con current.Ti meoutExcep tion; | |
12 | ||
13 | import org .apache.lo g4j.Logger ; | |
14 | import org .springfra mework.bat ch.core.Ex itStatus; | |
15 | import org .springfra mework.bat ch.core.Jo bExecution ; | |
16 | import org .springfra mework.bat ch.core.St epExecutio n; | |
17 | import org .springfra mework.bat ch.core.St epExecutio nListener; | |
18 | import org .springfra mework.bat ch.item.Pa rseExcepti on; | |
19 | import org .springfra mework.bat ch.item.Un expectedIn putExcepti on; | |
20 | import org .springfra mework.bat ch.item.fi le.FlatFil eParseExce ption; | |
21 | import org .springfra mework.bat ch.item.fi le.LineMap per; | |
22 | import org .springfra mework.bat ch.item.fi le.Resourc eAwareItem ReaderItem Stream; | |
23 | import org .springfra mework.bat ch.item.su pport.Abst ractItemCo untingItem StreamItem Reader; | |
24 | import org .springfra mework.bea ns.factory .Initializ ingBean; | |
25 | import org .springfra mework.cor e.io.Resou rce; | |
26 | import org .springfra mework.uti l.Assert; | |
27 | import org .springfra mework.uti l.ClassUti ls; | |
28 | ||
29 | import gov .va.cpss.s ervice.Sft pService; | |
30 | import gov .va.cpss.s ervice.Sft pStreamSes sion; | |
31 | ||
32 | /** | |
33 | * Impleme ntation of ItemCount ingItemStr eamItemRea der used b y a batch job to | |
34 | * read da ta over an sftp file stream. | |
35 | * | |
36 | * @author D N S | |
37 | */ | |
38 | public abs tract clas s CbssSftp ItemReader <T> extend s Abstract ItemCounti ngItemStre amItemRead er<T> | |
39 | impl ements Res ourceAware ItemReader ItemStream <T>, Initi alizingBea n, StepExe cutionList ener { | |
40 | ||
41 | pr ivate fina l int DEFA ULT_READ_A TTEMPT_COU NT = 30; | |
42 | ||
43 | pr otected st atic final Logger re aderLogger = Logger. getLogger( CbssSftpIt emReader.c lass.getCa nonicalNam e()); | |
44 | ||
45 | pr otected bo olean open ed = false ; | |
46 | ||
47 | pr ivate Sftp Service sf tpService; | |
48 | ||
49 | pr otected Jo bExecution jobExecut ion; | |
50 | ||
51 | pr ivate int lineCount = 0; | |
52 | ||
53 | pr otected Re source res ource; | |
54 | ||
55 | pr ivate Stri ng directo ry; | |
56 | ||
57 | pr ivate Line Mapper<T> lineMapper ; | |
58 | ||
59 | pr otected Bl ockingQueu e<String> dataQueue = new Link edBlocking Queue<>(); | |
60 | ||
61 | pr ivate Sftp StreamSess ion sftpSt reamSessio n; | |
62 | ||
63 | pr otected Cb ssStreamTo QueueThrea d queueBui lderThread ; | |
64 | ||
65 | pu blic CbssS ftpItemRea der() { | |
66 | setN ame(ClassU tils.getSh ortName(Cb ssSftpItem Reader.cla ss)); | |
67 | } | |
68 | ||
69 | pu blic SftpS ervice get SftpServic e() { | |
70 | retu rn sftpSer vice; | |
71 | } | |
72 | ||
73 | pu blic void setSftpSer vice(SftpS ervice sft pService) { | |
74 | this .sftpServi ce = sftpS ervice; | |
75 | } | |
76 | ||
77 | pu blic LineM apper<T> g etLineMapp er() { | |
78 | retu rn lineMap per; | |
79 | } | |
80 | ||
81 | pu blic void setLineMap per(LineMa pper<T> li neMapper) { | |
82 | this .lineMappe r = lineMa pper; | |
83 | } | |
84 | ||
85 | pu blic Strin g getDirec tory() { | |
86 | retu rn directo ry; | |
87 | } | |
88 | ||
89 | pu blic void setDirecto ry(String directory) { | |
90 | this .directory = directo ry; | |
91 | } | |
92 | ||
93 | @O verride | |
94 | pr otected vo id doOpen( ) throws E xception { | |
95 | ||
96 | open ed = false ; | |
97 | ||
98 | try { | |
99 | ||
100 | if (re source.get Filename() != null) { | |
101 | ||
102 | // Get t he size of the file. | |
103 | final Lo ng expecte dByteCount = sftpSer vice.ftpGe tFileSizeI nDirectory (resource. getFilenam e(), direc tory); | |
104 | ||
105 | if (expe ctedByteCo unt != nul l) { | |
106 | ||
107 | readerLogg er.info("A ttempting to read fi le from ft p server: " + resour ce.getFile name()); | |
108 | ||
109 | // Open th e file dow nload stre am session . | |
110 | sftpStream Session = sftpServic e.openFile Stream(res ource.getF ilename(), directory ); | |
111 | ||
112 | if (sftpSt reamSessio n != null) { | |
113 | ||
114 | // Start thr ead to pop ulate queu e. | |
115 | qu eueBuilder Thread = g etQueueBui lderThread (expectedB yteCount, sftpStream Session.ge tInputStre am(), data Queue); | |
116 | if (queueBui lderThread != null) { | |
117 | queu eBuilderTh read.start (); | |
118 | open ed = true; | |
119 | } else { | |
120 | setF ailureStat usAndMessa ge(FTP_OPE N_ERROR_ST ATUS, | |
121 | "Unable to open ft p connecti on because failed to obtain da ta stream processing thread"); | |
122 | } | |
123 | ||
124 | } else { | |
125 | se tFailureSt atusAndMes sage(FTP_O PEN_ERROR_ STATUS, | |
126 | "Unabl e to open ftp connec tion becau se failed to obtain data strea m"); | |
127 | } | |
128 | ||
129 | } else { | |
130 | setFailure StatusAndM essage(FTP _OPEN_ERRO R_STATUS, | |
131 | "Una ble to ope n ftp conn ection bec ause faile d file siz e request" ); | |
132 | } | |
133 | } | |
134 | ||
135 | } ca tch (Excep tion e) { | |
136 | String Builder er ror = new StringBuil der(); | |
137 | error. append("Un able to op en ftp con nection fo r read bec ause of ") ; | |
138 | error. append(e.g etClass(). getSimpleN ame()); | |
139 | error. append("\n Message: " ); | |
140 | error. append(e.g etMessage( )); | |
141 | ||
142 | setFai lureStatus AndMessage (FTP_OPEN_ ERROR_STAT US, error. toString() ); | |
143 | } | |
144 | } | |
145 | ||
146 | @O verride | |
147 | pu blic T rea d() throws Exception , Unexpect edInputExc eption, Pa rseExcepti on { | |
148 | try { | |
149 | ||
150 | return super.rea d(); | |
151 | ||
152 | } ca tch (Excep tion e) { | |
153 | ||
154 | String Builder er ror = new StringBuil der(); | |
155 | error. append("Un able to re ad item be cause of " ); | |
156 | error. append(e.g etClass(). getSimpleN ame()); | |
157 | error. append("\n Message: " ); | |
158 | error. append(e.g etMessage( )); | |
159 | if ((e .getCause( ) != null) && (e.get Cause().ge tMessage() != null)) { | |
160 | error.ap pend("\nCa use: "); | |
161 | error.ap pend(e.get Cause().ge tMessage() .trim()); | |
162 | } | |
163 | ||
164 | setFai lureStatus AndMessage (READ_FAIL URE_STATUS , error.to String()); | |
165 | } | |
166 | // N ull respon se will ca use step t o end. | |
167 | retu rn null; | |
168 | } | |
169 | ||
170 | /* * | |
171 | * Subclass must imple ment metho d for the appropriat e stream t o queue | |
172 | * builder t hread. | |
173 | * | |
174 | * @return T he queue b uilder thr ead. | |
175 | * / | |
176 | pr otected ab stract Cbs sStreamToQ ueueThread getQueueB uilderThre ad(long ex pectedByte Count, Inp utStream i n, | |
177 | Blocki ngQueue<St ring> outp utQueue); | |
178 | ||
179 | @O verride | |
180 | pu blic void beforeStep (StepExecu tion stepE xecution) { | |
181 | // S ave the jo b executio n at the b eginning o f the step . | |
182 | // T he executi on context will be u sed to set exit stat us if a fa ilure | |
183 | // d uring read processin g. | |
184 | jobE xecution = stepExecu tion.getJo bExecution (); | |
185 | } | |
186 | ||
187 | @O verride | |
188 | pu blic ExitS tatus afte rStep(Step Execution stepExecut ion) { | |
189 | // D o not do a nything sp ecial here . | |
190 | retu rn null; | |
191 | } | |
192 | ||
193 | /* * | |
194 | * Set the f ailure and message i n the job execution context. | |
195 | * / | |
196 | pr otected vo id setFail ureStatusA ndMessage( final Stri ng status, final Str ing messag e) { | |
197 | // S et job fai lure. | |
198 | setF ailureStat us(status) ; | |
199 | ||
200 | // S et job fai lure messa ge. | |
201 | setF ailureMess age(messag e); | |
202 | } | |
203 | ||
204 | /* * | |
205 | * Set the f ailure in the job ex ecution co ntext. | |
206 | * / | |
207 | pr ivate void setFailur eStatus(fi nal String status) { | |
208 | // L og job fai lure statu s. | |
209 | read erLogger.e rror("Read failed wi th status: " + statu s); | |
210 | ||
211 | // S et job fai lure. | |
212 | jobE xecution.g etExecutio nContext() .putString (JOB_FAILU RE_KEY, st atus); | |
213 | } | |
214 | ||
215 | /* * | |
216 | * Set the f ailure mes sage in th e job exec ution cont ext. | |
217 | * / | |
218 | pr ivate void setFailur eMessage(f inal Strin g message) { | |
219 | // L og job fai lure messa ge. | |
220 | read erLogger.e rror("Read failure m essage: " + message) ; | |
221 | ||
222 | // S et job fai lure messa ge. | |
223 | jobE xecution.g etExecutio nContext() .putString (JOB_FAILU RE_MESSAGE _KEY, mess age); | |
224 | } | |
225 | ||
226 | @O verride | |
227 | pr otected T doRead() t hrows Exce ption { | |
228 | ||
229 | // I f not open ed then re turn null which will effective ly stop | |
230 | // p rocessing. | |
231 | if ( !opened) { | |
232 | return null; | |
233 | } | |
234 | ||
235 | // L ocal flag to retry r ead until failure or get line. | |
236 | bool ean proces singStream = true; | |
237 | ||
238 | // L ocal count er to retr y so many times befo re declari ng a read failure. | |
239 | int readAttemp tCounter = 0; | |
240 | do { | |
241 | ||
242 | try { | |
243 | // Incre ment the a ttempt cou nter to ma ke sure we do not ge t | |
244 | // stuck infinite loop. | |
245 | ++readAt temptCount er; | |
246 | ||
247 | // Obtai n the next message. | |
248 | String l ine = read Line(); | |
249 | ||
250 | if (line == null) { | |
251 | ||
252 | // If thre ad is not alive or q ueue is em pty then q uit | |
253 | // process ing. | |
254 | // Otherwi se, will a ttempt to read again . | |
255 | if (!queue BuilderThr ead.isAliv e() && dat aQueue.isE mpty()) { | |
256 | pr ocessingSt ream = fal se; | |
257 | } else { | |
258 | re aderLogger .info("Wai ting for q ueue..."); | |
259 | Th read.sleep (1000); | |
260 | } | |
261 | ||
262 | } else { | |
263 | ||
264 | try { | |
265 | // Convert t he line to a mapped object. | |
266 | re turn lineM apper.mapL ine(line, lineCount) ; | |
267 | } catch (E xception e x) { | |
268 | th row new Fl atFilePars eException ("Parsing error at l ine: " + l ineCount + " in reso urce=[" | |
269 | + reso urce.getDe scription( ) + "], in put=[" + l ine + "]", ex, line, lineCount ); | |
270 | } | |
271 | } | |
272 | ||
273 | } catc h (Interru ptedExcept ion e) { | |
274 | readerLo gger.error ("Queueing thread no t dead and will atte mpt retry: " + e.get Message()) ; | |
275 | } | |
276 | ||
277 | // If attempted too many r ead attemp ts then de clare fail ure. | |
278 | if (re adAttemptC ounter > D EFAULT_REA D_ATTEMPT_ COUNT) { | |
279 | final St ring error = "Failed to read f rom data q ueue withi n allocate d number o f retries (" | |
280 | + DEFAULT_RE AD_ATTEMPT _COUNT + " )"; | |
281 | readerLo gger.error (error); | |
282 | processi ngStream = false; | |
283 | throw ne w TimeoutE xception(e rror); | |
284 | } | |
285 | ||
286 | } wh ile (proce ssingStrea m); | |
287 | ||
288 | retu rn null; | |
289 | } | |
290 | ||
291 | /* * | |
292 | * Read the line from the data q ueue. If a n error is encounter ed it shou ld | |
293 | * ripple up and be ca ught by re ad method. | |
294 | * | |
295 | * @return T he next ro w of the f ile. | |
296 | * @throws E xception | |
297 | * / | |
298 | pr otected St ring readL ine() thro ws Excepti on { | |
299 | ||
300 | if ( !dataQueue .isEmpty() ) { | |
301 | lineCo unt++; | |
302 | return dataQueue .take(); | |
303 | } | |
304 | ||
305 | retu rn null; | |
306 | } | |
307 | ||
308 | @O verride | |
309 | pu blic void afterPrope rtiesSet() throws Ex ception { | |
310 | Asse rt.notNull (lineMappe r, "LineMa pper is re quired"); | |
311 | } | |
312 | ||
313 | @O verride | |
314 | pu blic void setResourc e(Resource resource) { | |
315 | this .resource = resource ; | |
316 | } | |
317 | ||
318 | @O verride | |
319 | pr otected vo id doClose () throws Exception { | |
320 | ||
321 | // B e sure to reset the line count . | |
322 | line Count = 0; | |
323 | ||
324 | // S top the qu eue reader thread fr om process ing. | |
325 | // W ait up to 10 seconds for the t hread to s top proces sing. | |
326 | if ( queueBuild erThread ! = null) { | |
327 | queueB uilderThre ad.interru pt(); | |
328 | queueB uilderThre ad.join(10 000); | |
329 | } | |
330 | ||
331 | // C lose the s ession and the input stream. | |
332 | if ( sftpStream Session != null) { | |
333 | sftpSt reamSessio n.close(); | |
334 | } | |
335 | } | |
336 | ||
337 | } |
Araxis Merge (but not the data content of this report) is Copyright © 1993-2016 Araxis Ltd (www.araxis.com). All rights reserved.