Summary Table
Categories |
Total Count |
PII |
0 |
URL |
0 |
DNS |
0 |
EKL |
0 |
IP |
0 |
PORT |
0 |
VsID |
0 |
CF |
0 |
AI |
0 |
VPD |
0 |
PL |
0 |
Other |
0 |
File Content
package gov.va.med.x277ca.file.watcher;
import java.io.File;
import java.nio.file.Path;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import gov.va.med.ars.service.IParse277CAServiceImpl;
@Component
public class IngestFileProcessorPool {
private static final Logger logger = LogManager.getLogger(IngestFileProcessorPool.class);
private static final int DEFAULT_NUMBER_OF_THREADS = 10;
@Autowired
IParse277CAServiceImpl parseService;
@Value("${gov.va.med.x12277ca.file.watcher.processor.threads}")
private String numberOfThreads;
@Value("${gov.va.med.x12277ca.file.watcher.directory.archive}")
String ingestFileArchiveDirectory;
private ExecutorService threadPool;
@PostConstruct
public void postInit() {
checkInitialization();
}
private boolean checkInitialization() {
// initialize archive directory
File directoryToBeCreatedIfNotThere = new File(FilenameUtils.normalize(ingestFileArchiveDirectory));
if (!(directoryToBeCreatedIfNotThere.exists() && directoryToBeCreatedIfNotThere.isDirectory())) {
logger.info("Directory to be archived to does not exist, creating...");
boolean success = directoryToBeCreatedIfNotThere.mkdirs();
if (!success) {
logger.error("Directory to be archived to could not be created");
}
}
// initialize thread pool if not available
if (threadPool == null || threadPool.isShutdown()) {
int initNumberOfThreads = DEFAULT_NUMBER_OF_THREADS;
if (StringUtils.isNotBlank(numberOfThreads) && StringUtils.isNumeric(numberOfThreads.trim())) {
logger.info("Initializing IngestFileProcessorPool with number of threads from properties: "
+ numberOfThreads);
initNumberOfThreads = Integer.parseInt(numberOfThreads);
} else {
logger.info("Initializing IngestFileProcessorPool with default number of threads from class: "
+ numberOfThreads);
}
threadPool = Executors.newFixedThreadPool(initNumberOfThreads);
}
return directoryToBeCreatedIfNotThere.exists() && directoryToBeCreatedIfNotThere.isDirectory()
&& threadPool != null;
}
public void startProcessingReceivedFile(Path receivedFile) {
if (checkInitialization()) {
logger.info("Inside IngestFileProcessorPool.startProcessingReceivedFile, received file: "
+ receivedFile.toString());
IngestFileProcessor fileProcessor = new IngestFileProcessor(receivedFile, ingestFileArchiveDirectory,
parseService);
threadPool.submit(fileProcessor);
} else {
logger.info("Cannot process file located at: " + receivedFile
+ " due to either archive directory not being available or thread pool not available.");
}
}
@PreDestroy
public void close() {
if (threadPool != null && !threadPool.isTerminated()) {
threadPool.shutdown();
try {
if (!threadPool.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
threadPool.shutdownNow();
}
} catch (InterruptedException e) {
if (threadPool != null) {
threadPool.shutdownNow();
}
}
}
}
}