001package org.unix4j.unix.sort; 002 003import java.util.Comparator; 004import java.util.List; 005 006import org.unix4j.context.ExecutionContext; 007import org.unix4j.io.Input; 008import org.unix4j.line.Line; 009import org.unix4j.processor.LineProcessor; 010 011class MergeProcessor extends AbstractSortProcessor { 012 013 private final List<? extends Input> inputs; 014 015 public MergeProcessor(SortCommand command, ExecutionContext context, LineProcessor output, List<? extends Input> inputs) { 016 super(command, context, output); 017 this.inputs = inputs; 018 } 019 020 @Override 021 public boolean processLine(Line line) { 022 //if lines come from standard input, there is nothing to merge 023 return getOutput().processLine(line); 024 } 025 026 @Override 027 public void finish() { 028 final int len = inputs.size(); 029 final Line[] lines = new Line[len]; 030 for (int i = 0; i < len; i++) { 031 final Input input = inputs.get(i); 032 lines[i] = input.hasMoreLines() ? input.readLine() : null; 033 } 034 final LineProcessor output = getOutput(); 035 final Comparator<? super Line> comparator = getComparator(); 036 while (true) { 037 Line line = null; 038 int inputIndexOfLine = -1; 039 for (int i = 0; i < len; i++) { 040 final Line cur = lines[i]; 041 if (cur != null) { 042 if (inputIndexOfLine < 0 || 0 < comparator.compare(line, cur)) { 043 line = cur; 044 inputIndexOfLine = i; 045 } 046 } 047 } 048 if (line != null) { 049 //write the winner line 050 if (!output.processLine(line)) { 051 output.finish(); 052 closeInputs(); 053 return; 054 } 055 //move to next line for winner input 056 final Input input = inputs.get(inputIndexOfLine); 057 lines[inputIndexOfLine] = input.hasMoreLines() ? input.readLine() : null; 058 } else { 059 //no more lines 060 output.finish(); 061 closeInputs(); 062 return; 063 } 064 } 065 } 066 067 private void closeInputs() { 068 for (final Input input : inputs) { 069 input.close(); 070 } 071 } 072 073}