Summary Table
Categories |
Total Count |
PII |
0 |
URL |
0 |
DNS |
0 |
EKL |
0 |
IP |
0 |
PORT |
0 |
VsID |
0 |
CF |
0 |
AI |
0 |
VPD |
0 |
PL |
0 |
Other |
0 |
File Content
package gov.va.med.x277ca.file.watcher;
import java.nio.file.*;
import java.nio.file.WatchEvent.Kind;
import static java.nio.file.StandardWatchEventKinds.*;
import java.io.*;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.beans.factory.annotation.Autowired;
/**
* Example to watch a directory (or tree) for changes to files.
*/
@Component
public class IngestFileWatcher implements Runnable {
private static final Logger logger = LogManager.getLogger(IngestFileWatcher.class);
@Autowired
IngestFileProcessorPool processorPool;
private final WatchService watcher;
private final Path path;
private boolean shouldRun;
@SuppressWarnings("unchecked")
static <T> WatchEvent<T> cast(WatchEvent<?> event) {
return (WatchEvent<T>) event;
}
/**
* Creates a WatchService and registers the given directory
*/
@Autowired
IngestFileWatcher(@Value("${gov.va.med.x12277ca.file.watcher.directory}") String ingestDirectoryUnderWatch)
throws IOException {
// register directory and process its events
logger.info(String.format("Directory being watched: \"%s\"", ingestDirectoryUnderWatch));
this.watcher = FileSystems.getDefault().newWatchService();
File directoryToBeCreatedIfNotThere = new File(ingestDirectoryUnderWatch);
if (!(directoryToBeCreatedIfNotThere.exists() && directoryToBeCreatedIfNotThere.isDirectory())) {
logger.info("Directory to be watched does not exist, creating...");
boolean success = directoryToBeCreatedIfNotThere.mkdirs();
if (!success) {
logger.error("Directory to be watched could not be created");
}
}
path = Paths.get(ingestDirectoryUnderWatch);
path.register(watcher, ENTRY_CREATE);
this.shouldRun = true;
}
@Override
public void run() {
processEvents();
}
/**
* Process all events for keys queued to the watcher
*/
void processEvents() {
// wait for key to be signaled
WatchKey key;
try {
while (this.shouldRun) {
key = this.watcher.poll(100, TimeUnit.MILLISECONDS);
if (key != null) {
for (WatchEvent<?> event : key.pollEvents()) {
Kind<?> kind = event.kind();
// TBD - provide example of how OVERFLOW event is
// handled
if (kind == OVERFLOW) {
continue;
}
// Context for directory entry event is the file name of
// entry
WatchEvent<Path> ev = cast(event);
logger.info("Count before Path check: " + ev.count());
Path name = ev.context();
Path child = path.resolve(name);
// print out event
logger.info((String.format("%s: %s%n", event.kind().name(), child)));
if (kind == ENTRY_CREATE) {
processorPool.startProcessingReceivedFile(child);
}
}
// reset key and remove from set if directory no longer
// accessible
boolean valid = key.reset();
if (!valid) {
break;
}
}
}
} catch (InterruptedException x) {
logger.info("WatchService has been closed");
return;
} catch (ClosedWatchServiceException ex) {
logger.info("WatchService was already closed and attempted to retrieve a watch event.");
return;
}
}
public void stopWatcher() {
this.shouldRun = false;
try {
watcher.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}