LogHandler.java

package com.saptarshidebnath.lib.processrunner.output;

import static com.saptarshidebnath.lib.processrunner.constants.ProcessRunnerConstants.EMPTY_STR;

import com.saptarshidebnath.lib.processrunner.configuration.Configuration;
import com.saptarshidebnath.lib.processrunner.constants.OutputSourceType;
import com.saptarshidebnath.lib.processrunner.constants.ProcessRunnerConstants;
import com.saptarshidebnath.lib.processrunner.model.OutputRecord;
import com.saptarshidebnath.lib.processrunner.utilities.Threadify;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Scanner;
import java.util.StringJoiner;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Writes the process logs as single line json of format {@link OutputRecord} to the file as
 * configured in {@link Configuration#getMasterLogFile()}. If {@link
 * Configuration#getMasterLogFile()} returns null, the logs will not be written.
 */
public class LogHandler {

  private Logger logger = LoggerFactory.getLogger(LogHandler.class);
  private PrintWriter printWriter;
  private BlockingQueue<OutputRecord> queue;
  private boolean streamingEnabled;
  private ArrayList<Future> inputStreamReadingThreads;
  private ExecutorService executorService;
  private Future diskWritingThread;
  private String processConfigrationAsString;
  private File masterLogFile;
  private boolean logsNeedTobeWritten;
  private Process process;
  private Configuration configuration;

  /**
   * The construcor of the class {@link LogHandler}.
   *
   * <p>The class receives a reference of the {@link Process} and {@link Configuration}. The class
   * then internally takes care of creating all the process output reading threads i.e. both {@link
   * OutputSourceType#SYSERR} and {@link OutputSourceType#SYSOUT} and writing the same to the disk
   * in separate thread. The class also provides a blocking method {@link
   * LogHandler#waitForShutdown()} so that you can wait for all logs to be streamed, written to disk
   * or both.
   *
   * <p>The class internally uses an executor service from {@link
   * Threadify#getProcessRunnerExecutorService()}
   *
   * @param process a object of type {@link Process}
   * @param configuration a reference of type {@link Configuration}
   */
  public LogHandler(Process process, Configuration configuration) {
    this.process = process;
    this.configuration = configuration;
    this.masterLogFile = this.configuration.getMasterLogFile();
    this.streamingEnabled = this.configuration.isEnableLogStreaming();
    this.logsNeedTobeWritten = this.configuration.getMasterLogFile() != null;
    this.processConfigrationAsString = this.configuration.toString();
  }

  public LogHandler start() throws FileNotFoundException {

    boolean logsNeedTobeRead = logsNeedTobeWritten || streamingEnabled;
    if (logsNeedTobeRead) {
      this.queue = new LinkedBlockingQueue<>(ProcessRunnerConstants.CACHE_SIZE);
      executorService = new Threadify().getProcessRunnerExecutorService();
      if (streamingEnabled) {
        logger.info("Logs will be streamed on real time.");
      } else {
        logger.warn("Logs streaming disabled.");
      }
      this.inputStreamReadingThreads = new ArrayList<>();
      //
      // Track SYSOUT
      //
      this.saveInpuStreamToDisk(process.getInputStream(), OutputSourceType.SYSOUT);
      //
      // Track SYSERR
      //
      this.saveInpuStreamToDisk(process.getErrorStream(), OutputSourceType.SYSERR);
      //
      // Write content to DISK
      //
      if (logsNeedTobeWritten) {
        this.printWriter =
            new PrintWriter(
                new OutputStreamWriter(
                    new FileOutputStream(masterLogFile), this.configuration.getCharset()));
        this.diskWritingThread = executorService.submit(this::writeToDisk);
      } else {
        logger.warn(
            "Logs not written to file as per configuration : {}", processConfigrationAsString);
      }
      //
      // Mark for shutdown after execution is complete.
      //
      this.executorService.shutdown();
      logger.debug("Created LogHandler. Tracking SYSOUT and SYSERROR");
    } else {
      logger.warn("Log Streaming is not enabled and Master logfile not set. Discarding logs.");
      logger.warn("Configuration received : {}", processConfigrationAsString);
    }
    return this;
  }

  /**
   * Threadify the writing of the inputStream to disk.
   *
   * <p>Internal method and shouldn't be used externally.
   *
   * @param inputStream {@link InputStream} from {@link Process#getInputStream()} and {@link
   *     Process#getErrorStream()}.
   * @param outputSourceType Type of Output as per {@link OutputSourceType}
   */
  private void saveInpuStreamToDisk(InputStream inputStream, OutputSourceType outputSourceType) {
    inputStreamReadingThreads.add(
        this.executorService.submit(() -> this.readInputStream(inputStream, outputSourceType)));
  }

  /**
   * Blocking method to wait for all all 3 threads to be finished. 2 threads are to read {@link
   * OutputSourceType#SYSOUT} and {@link OutputSourceType#SYSERR}. The last thread is to write the
   * read {@link InputStream} to disk.
   *
   * <p>Call this method to wait for the log handler threads to be finished execution.
   *
   * @throws InterruptedException when the {@link ExecutorService#awaitTermination(long, TimeUnit)}
   *     is interrupted by some other {@link Thread}.
   * @throws ExecutionException when waiting for the disk writer to finish.
   */
  public void waitForShutdown() throws InterruptedException, ExecutionException {
    if (this.configuration.getMasterLogFile() != null) {
      logger.info("Waiting for all the logs writing thread to shutdown.");
      //
      // Wait for the disk writing thread to stop.
      //
      this.diskWritingThread.get();
      //
      // Wait for the termination of executor service.
      //
      this.executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
      logger.info("Waiting for all the logs writing thread to shutdown.");

    } else {
      logger.error("Masterfile Configuration is missing : {}", this.processConfigrationAsString);
      logger.error("Discarding logs.");
    }
  }

  /**
   * Write the content of {@link LogHandler#queue} to the file as recived from {@link Configuration}
   * object.
   *
   * <p>Internal method, shouldn't be used externallyy. Automatically called when a object of {@link
   * LogHandler} is created.
   *
   * @return int depciting the number of lines written.
   * @throws InterruptedException if the disk writing thread is interrupted.
   */
  private int writeToDisk() throws InterruptedException {
    String threadName =
        new StringJoiner(EMPTY_STR)
            .add(Thread.currentThread().getName())
            .add(ProcessRunnerConstants.DISK_WRITER_THREAD_NAME_SUFFIX)
            .toString();
    Thread.currentThread().setName(threadName);
    logger.info("Starting {}  to write to disk", threadName);
    //
    // Look for log while the intputstream still have data or the queue have not
    // finished writing to
    // disk
    //
    int counter = -1;
    while (this.inputStreamReadingThreads.stream().anyMatch(future -> !future.isDone())
        || !queue.isEmpty()) {
      if (queue.isEmpty()) {
        //
        // If queue is empty wait for some time
        //
        logger.debug(
            "Queue is empty, waiting for {} milliseconds", ProcessRunnerConstants.THREAD_WAIT_TIME);
        Thread.sleep(ProcessRunnerConstants.THREAD_WAIT_TIME);
      } else {
        //
        // Write all the element in the queue to the disk.
        //
        List<OutputRecord> record = new ArrayList<>(ProcessRunnerConstants.FILE_WRITER_OBJECT_SIZE);
        int numberElementDrained =
            queue.drainTo(record, ProcessRunnerConstants.FILE_WRITER_OBJECT_SIZE);
        assert numberElementDrained == record.size();
        counter += record.size();
        record.stream().map(ProcessRunnerConstants.GSON::toJson).forEach(printWriter::println);
      }
      //
      // Force flush
      //
      printWriter.flush();
    }
    printWriter.close();
    logger.debug("Wrote {} lines to master log file.", ++counter);
    return counter;
  }

  /**
   * Reads the {@link InputStream} and write them to a {@link LogHandler#queue} as {@link
   * OutputRecord}.
   *
   * <p>This is a internal method and shouldn't be used by any body in the library.
   *
   * @param inputStream the {@link InputStream} to be read. The inputStream is received from {@link
   *     Process#getErrorStream()} and {@link Process#getInputStream()}.
   * @param outputSourceType either as input {@link OutputSourceType#SYSOUT} or {@link
   *     OutputSourceType#SYSERR}.
   */
  private void readInputStream(InputStream inputStream, OutputSourceType outputSourceType) {
    String outputSourceTypeAsString = outputSourceType.toString();
    logger.trace("Saving input stream for : {}", outputSourceTypeAsString);
    String threadName =
        new StringJoiner("")
            .add(Thread.currentThread().getName())
            .add(ProcessRunnerConstants.STREAM_READER_THREAD_NAME_SUFFIX)
            .add(outputSourceType.toString())
            .toString();
    Thread.currentThread().setName(threadName);
    logger.trace("Starting {} to read {}", threadName, outputSourceTypeAsString);
    Scanner scanner = new Scanner(inputStream, Charset.defaultCharset().toString());
    String currentLine;
    String loggingMessage;
    while (scanner.hasNextLine()) {
      currentLine = scanner.nextLine();
      loggingMessage =
          new StringJoiner(" >> ").add(outputSourceType.toString()).add(currentLine).toString();
      if (streamingEnabled) {
        logger.info(loggingMessage);
      } else {
        logger.trace(loggingMessage);
      }
      boolean response = this.queue.add(new OutputRecord(outputSourceType, currentLine));
      assert response;
    }
  }
}