Summary Table

Categories Total Count
PII 0
URL 0
DNS 0
EKL 0
IP 0
PORT 0
VsID 0
CF 0
AI 0
VPD 0
PL 0
Other 0

File Content

package gov.va.med.x277ca.file.watcher;

import java.io.File;
import java.nio.file.Path;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import gov.va.med.ars.service.IParse277CAServiceImpl;

@Component
public class IngestFileProcessorPool {

private static final Logger logger = LogManager.getLogger(IngestFileProcessorPool.class);
private static final int DEFAULT_NUMBER_OF_THREADS = 10;

@Autowired
IParse277CAServiceImpl parseService;

@Value("${gov.va.med.x12277ca.file.watcher.processor.threads}")
private String numberOfThreads;

@Value("${gov.va.med.x12277ca.file.watcher.directory.archive}")
String ingestFileArchiveDirectory;

private ExecutorService threadPool;

@PostConstruct
public void postInit() {
checkInitialization();
}

private boolean checkInitialization() {

// initialize archive directory
File directoryToBeCreatedIfNotThere = new File(FilenameUtils.normalize(ingestFileArchiveDirectory));

if (!(directoryToBeCreatedIfNotThere.exists() && directoryToBeCreatedIfNotThere.isDirectory())) {

logger.info("Directory to be archived to does not exist, creating...");

boolean success = directoryToBeCreatedIfNotThere.mkdirs();

if (!success) {
logger.error("Directory to be archived to could not be created");
}
}

// initialize thread pool if not available
if (threadPool == null || threadPool.isShutdown()) {

int initNumberOfThreads = DEFAULT_NUMBER_OF_THREADS;

if (StringUtils.isNotBlank(numberOfThreads) && StringUtils.isNumeric(numberOfThreads.trim())) {
logger.info("Initializing IngestFileProcessorPool with number of threads from properties: "
+ numberOfThreads);
initNumberOfThreads = Integer.parseInt(numberOfThreads);
} else {
logger.info("Initializing IngestFileProcessorPool with default number of threads from class: "
+ numberOfThreads);
}

threadPool = Executors.newFixedThreadPool(initNumberOfThreads);
}

return directoryToBeCreatedIfNotThere.exists() && directoryToBeCreatedIfNotThere.isDirectory()
&& threadPool != null;
}

public void startProcessingReceivedFile(Path receivedFile) {

if (checkInitialization()) {

logger.info("Inside IngestFileProcessorPool.startProcessingReceivedFile, received file: "
+ receivedFile.toString());

IngestFileProcessor fileProcessor = new IngestFileProcessor(receivedFile, ingestFileArchiveDirectory,
parseService);
threadPool.submit(fileProcessor);

} else {
logger.info("Cannot process file located at: " + receivedFile
+ " due to either archive directory not being available or thread pool not available.");
}
}

@PreDestroy
public void close() {
if (threadPool != null && !threadPool.isTerminated()) {
threadPool.shutdown();
try {
if (!threadPool.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
threadPool.shutdownNow();
}
} catch (InterruptedException e) {
if (threadPool != null) {
threadPool.shutdownNow();
}
}
}
}
}