/*
 * Copyright (c) 1995, 2013, Oracle and/or its affiliates. All rights reserved.
 * Copyright (C) 2014-2020 MicroEJ Corp. - EDC compliance and optimizations.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package java.io;

import ej.annotation.Nullable;

/**
 * A piped input stream should be connected to a piped output stream; the piped input stream then provides whatever data
 * bytes are written to the piped output stream. Typically, data is read from a <code>PipedInputStream</code> object by
 * one thread and data is written to the corresponding <code>PipedOutputStream</code> by some other thread. Attempting
 * to use both objects from a single thread is not recommended, as it may deadlock the thread. The piped input stream
 * contains a buffer, decoupling read operations from write operations, within limits. A pipe is said to be
 * <a name="BROKEN"> <i>broken</i> </a> if a thread that was providing data bytes to the connected piped output stream
 * is no longer alive.
 *
 * @author James Gosling
 * @see java.io.PipedOutputStream
 * @since JDK1.0
 */
public class PipedInputStream extends InputStream {
	boolean closedByWriter = false;
	volatile boolean closedByReader = false;
	boolean connected = false;

	/*
	 * REMIND: identification of the read and write sides needs to be more sophisticated. Either using thread groups
	 * (but what about pipes within a thread?) or using finalization (but it may be a long time until the next GC).
	 */
	@Nullable
	Thread readSide;
	@Nullable
	Thread writeSide;

	private static final int DEFAULT_PIPE_SIZE = 1024;

	/**
	 * The default size of the pipe's circular input buffer.
	 *
	 * @since JDK1.1
	 */
	// This used to be a constant before the pipe size was allowed
	// to change. This field will continue to be maintained
	// for backward compatibility.
	protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE;

	/**
	 * The circular buffer into which incoming data is placed.
	 *
	 * @since JDK1.1
	 */
	protected byte buffer[];

	/**
	 * The index of the position in the circular buffer at which the next byte of data will be stored when received from
	 * the connected piped output stream. <code>in&lt;0</code> implies the buffer is empty, <code>in==out</code> implies
	 * the buffer is full
	 *
	 * @since JDK1.1
	 */
	protected int in = -1;

	/**
	 * The index of the position in the circular buffer at which the next byte of data will be read by this piped input
	 * stream.
	 *
	 * @since JDK1.1
	 */
	protected int out = 0;

	/**
	 * Creates a <code>PipedInputStream</code> so that it is connected to the piped output stream <code>src</code>. Data
	 * bytes written to <code>src</code> will then be available as input from this stream.
	 *
	 * @param src
	 *            the stream to connect to.
	 * @exception IOException
	 *                if an I/O error occurs.
	 */
	public PipedInputStream(PipedOutputStream src) throws IOException {
		this(src, DEFAULT_PIPE_SIZE);
	}

	/**
	 * Creates a <code>PipedInputStream</code> so that it is connected to the piped output stream <code>src</code> and
	 * uses the specified pipe size for the pipe's buffer. Data bytes written to <code>src</code> will then be available
	 * as input from this stream.
	 *
	 * @param src
	 *            the stream to connect to.
	 * @param pipeSize
	 *            the size of the pipe's buffer.
	 * @exception IOException
	 *                if an I/O error occurs.
	 * @exception IllegalArgumentException
	 *                if {@code pipeSize <= 0}.
	 * @since 1.6
	 */
	public PipedInputStream(PipedOutputStream src, int pipeSize) throws IOException {
		// inlined from initPipe(pipeSize)
		if (pipeSize <= 0) {
			throw new IllegalArgumentException("Pipe Size <= 0");
		}
		this.buffer = new byte[pipeSize];

		connect(src);
	}

	/**
	 * Creates a <code>PipedInputStream</code> so that it is not yet {@linkplain #connect(java.io.PipedOutputStream)
	 * connected}. It must be {@linkplain java.io.PipedOutputStream#connect( java.io.PipedInputStream) connected} to a
	 * <code>PipedOutputStream</code> before being used.
	 */
	public PipedInputStream() {
		// inlined from initPipe(pipeSize)
		this.buffer = new byte[DEFAULT_PIPE_SIZE];
	}

	/**
	 * Creates a <code>PipedInputStream</code> so that it is not yet {@linkplain #connect(java.io.PipedOutputStream)
	 * connected} and uses the specified pipe size for the pipe's buffer. It must be
	 * {@linkplain java.io.PipedOutputStream#connect( java.io.PipedInputStream) connected} to a
	 * <code>PipedOutputStream</code> before being used.
	 *
	 * @param pipeSize
	 *            the size of the pipe's buffer.
	 * @exception IllegalArgumentException
	 *                if {@code pipeSize <= 0}.
	 * @since 1.6
	 */
	public PipedInputStream(int pipeSize) {
		// inlined from initPipe(pipeSize)
		if (pipeSize <= 0) {
			throw new IllegalArgumentException("Pipe Size <= 0");
		}
		this.buffer = new byte[pipeSize];
	}

	/**
	 * Causes this piped input stream to be connected to the piped output stream <code>src</code>. If this object is
	 * already connected to some other piped output stream, an <code>IOException</code> is thrown.
	 * <p>
	 * If <code>src</code> is an unconnected piped output stream and <code>snk</code> is an unconnected piped input
	 * stream, they may be connected by either the call:
	 *
	 * <pre>
	 * <code>snk.connect(src)</code>
	 * </pre>
	 * <p>
	 * or the call:
	 *
	 * <pre>
	 * <code>src.connect(snk)</code>
	 * </pre>
	 * <p>
	 * The two calls have the same effect.
	 *
	 * @param src
	 *            The piped output stream to connect to.
	 * @exception IOException
	 *                if an I/O error occurs.
	 */
	public void connect(PipedOutputStream src) throws IOException {
		src.connect(this);
	}

	/**
	 * Receives a byte of data. This method will block if no input is available.
	 *
	 * @param b
	 *            the byte being received
	 * @exception IOException
	 *                If the pipe is <a href="#BROKEN"> <code>broken</code></a>,
	 *                {@link #connect(java.io.PipedOutputStream) unconnected}, closed, or if an I/O error occurs.
	 * @since JDK1.1
	 */
	protected synchronized void receive(int b) throws IOException {
		checkStateForReceive();
		this.writeSide = Thread.currentThread();
		if (this.in == this.out) {
			awaitSpace();
		}
		if (this.in < 0) {
			this.in = 0;
			this.out = 0;
		}
		this.buffer[this.in++] = (byte) (b & 0xFF);
		if (this.in >= this.buffer.length) {
			this.in = 0;
		}
	}

	/**
	 * Receives data into an array of bytes. This method will block until some input is available.
	 *
	 * @param b
	 *            the buffer into which the data is received
	 * @param off
	 *            the start offset of the data
	 * @param len
	 *            the maximum number of bytes received
	 * @exception IOException
	 *                If the pipe is <a href="#BROKEN"> broken</a>, {@link #connect(java.io.PipedOutputStream)
	 *                unconnected}, closed,or if an I/O error occurs.
	 */
	synchronized void receive(byte b[], int off, int len) throws IOException {
		checkStateForReceive();
		this.writeSide = Thread.currentThread();
		int bytesToTransfer = len;
		while (bytesToTransfer > 0) {
			if (this.in == this.out) {
				awaitSpace();
			}
			int nextTransferAmount = 0;
			if (this.out < this.in) {
				nextTransferAmount = this.buffer.length - this.in;
			} else if (this.in < this.out) {
				if (this.in == -1) {
					this.in = this.out = 0;
					nextTransferAmount = this.buffer.length - this.in;
				} else {
					nextTransferAmount = this.out - this.in;
				}
			}
			if (nextTransferAmount > bytesToTransfer) {
				nextTransferAmount = bytesToTransfer;
			}
			assert (nextTransferAmount > 0);
			System.arraycopy(b, off, this.buffer, this.in, nextTransferAmount);
			bytesToTransfer -= nextTransferAmount;
			off += nextTransferAmount;
			this.in += nextTransferAmount;
			if (this.in >= this.buffer.length) {
				this.in = 0;
			}
		}
	}

	private void checkStateForReceive() throws IOException {
		if (!this.connected) {
			throw new IOException("Pipe not connected");
		} else if (this.closedByWriter || this.closedByReader) {
			throw new IOException("Pipe closed");
		} else {
			Thread readSide = this.readSide;
			if (readSide != null && !readSide.isAlive()) {
				throw new IOException("Read end dead");
			}
		}
	}

	private void awaitSpace() throws IOException {
		while (this.in == this.out) {
			checkStateForReceive();

			/* full: kick any waiting readers */
			notifyAll();
			try {
				wait(1000);
			} catch (InterruptedException ex) {
				throw new java.io.InterruptedIOException();
			}
		}
	}

	/**
	 * Notifies all waiting threads that the last byte of data has been received.
	 */
	synchronized void receivedLast() {
		this.closedByWriter = true;
		notifyAll();
	}

	/**
	 * Reads the next byte of data from this piped input stream. The value byte is returned as an <code>int</code> in
	 * the range <code>0</code> to <code>255</code>. This method blocks until input data is available, the end of the
	 * stream is detected, or an exception is thrown.
	 *
	 * @return the next byte of data, or <code>-1</code> if the end of the stream is reached.
	 * @exception IOException
	 *                if the pipe is {@link #connect(java.io.PipedOutputStream) unconnected}, <a href="#BROKEN">
	 *                <code>broken</code></a>, closed, or if an I/O error occurs.
	 */
	@Override

	public synchronized int read() throws IOException {
		Thread writeSide = this.writeSide;
		if (!this.connected) {
			throw new IOException("Pipe not connected");
		} else if (this.closedByReader) {
			throw new IOException("Pipe closed");
		} else if (writeSide != null && !writeSide.isAlive() && !this.closedByWriter && (this.in < 0)) {
			throw new IOException("Write end dead");
		}

		this.readSide = Thread.currentThread();
		int trials = 2;
		while (this.in < 0) {
			if (this.closedByWriter) {
				/* closed by writer, return EOF */
				return -1;
			}
			if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
				throw new IOException("Pipe broken");
			}
			/* might be a writer waiting */
			notifyAll();
			try {
				wait(1000);
			} catch (InterruptedException ex) {
				throw new java.io.InterruptedIOException();
			}
		}
		int ret = this.buffer[this.out++] & 0xFF;
		if (this.out >= this.buffer.length) {
			this.out = 0;
		}
		if (this.in == this.out) {
			/* now empty */
			this.in = -1;
		}

		return ret;
	}

	/**
	 * Reads up to <code>len</code> bytes of data from this piped input stream into an array of bytes. Less than
	 * <code>len</code> bytes will be read if the end of the data stream is reached or if <code>len</code> exceeds the
	 * pipe's buffer size. If <code>len </code> is zero, then no bytes are read and 0 is returned; otherwise, the method
	 * blocks until at least 1 byte of input is available, end of the stream has been detected, or an exception is
	 * thrown.
	 *
	 * @param b
	 *            the buffer into which the data is read.
	 * @param off
	 *            the start offset in the destination array <code>b</code>
	 * @param len
	 *            the maximum number of bytes read.
	 * @return the total number of bytes read into the buffer, or <code>-1</code> if there is no more data because the
	 *         end of the stream has been reached.
	 * @exception NullPointerException
	 *                If <code>b</code> is <code>null</code>.
	 * @exception IndexOutOfBoundsException
	 *                If <code>off</code> is negative, <code>len</code> is negative, or <code>len</code> is greater than
	 *                <code>b.length - off</code>
	 * @exception IOException
	 *                if the pipe is <a href="#BROKEN"> <code>broken</code></a>,
	 *                {@link #connect(java.io.PipedOutputStream) unconnected}, closed, or if an I/O error occurs.
	 */
	@Override
	public synchronized int read(byte b[], int off, int len) throws IOException {
		// b array is not allowed to be null when Null Analysis is enabled.
		// NullPointerException will be thrown by the VM at b.length() field access.
		int length = b.length;
		if (off < 0 || len < 0 || len > length - off) {
			throw new IndexOutOfBoundsException();
		} else if (len == 0) {
			return 0;
		}

		/* possibly wait on the first character */
		int c = read();
		if (c < 0) {
			return -1;
		}
		b[off] = (byte) c;
		int rlen = 1;
		while ((this.in >= 0) && (len > 1)) {

			int available;

			if (this.in > this.out) {
				available = Math.min((this.buffer.length - this.out), (this.in - this.out));
			} else {
				available = this.buffer.length - this.out;
			}

			// A byte is read beforehand outside the loop
			if (available > (len - 1)) {
				available = len - 1;
			}
			System.arraycopy(this.buffer, this.out, b, off + rlen, available);
			this.out += available;
			rlen += available;
			len -= available;

			if (this.out >= this.buffer.length) {
				this.out = 0;
			}
			if (this.in == this.out) {
				/* now empty */
				this.in = -1;
			}
		}
		return rlen;
	}

	/**
	 * Returns the number of bytes that can be read from this input stream without blocking.
	 *
	 * @return the number of bytes that can be read from this input stream without blocking, or {@code 0} if this input
	 *         stream has been closed by invoking its {@link #close()} method, or if the pipe is
	 *         {@link #connect(java.io.PipedOutputStream) unconnected}, or <a href="#BROKEN"> <code>broken</code></a>.
	 *
	 * @exception IOException
	 *                if an I/O error occurs.
	 * @since JDK1.0.2
	 */
	@Override
	public synchronized int available() throws IOException {
		if (this.in < 0) {
			return 0;
		} else if (this.in == this.out) {
			return this.buffer.length;
		} else if (this.in > this.out) {
			return this.in - this.out;
		} else {
			return this.in + this.buffer.length - this.out;
		}
	}

	/**
	 * Closes this piped input stream and releases any system resources associated with the stream.
	 *
	 * @exception IOException
	 *                if an I/O error occurs.
	 */
	@Override
	public void close() throws IOException {
		this.closedByReader = true;
		synchronized (this) {
			this.in = -1;
		}
	}
}
