From cade139e0c3eaae2b6486704b61b765f32a0c11b Mon Sep 17 00:00:00 2001 From: oising Date: Sun, 21 Apr 2019 13:31:36 -0400 Subject: [PATCH] initial commit for tools/readconsoleinputstream PR --- .../ConcurrentCircularQueue.cs | 62 ++++++ tools/ReadConsoleInputStream/NativeMethods.cs | 22 ++ tools/ReadConsoleInputStream/Program.cs | 151 ++++++++++++++ .../ReadConsoleInputStream.cs | 194 ++++++++++++++++++ .../ReadConsoleInputStreamDemo.csproj | 16 ++ 5 files changed, 445 insertions(+) create mode 100644 tools/ReadConsoleInputStream/ConcurrentCircularQueue.cs create mode 100644 tools/ReadConsoleInputStream/NativeMethods.cs create mode 100644 tools/ReadConsoleInputStream/Program.cs create mode 100644 tools/ReadConsoleInputStream/ReadConsoleInputStream.cs create mode 100644 tools/ReadConsoleInputStream/ReadConsoleInputStreamDemo.csproj diff --git a/tools/ReadConsoleInputStream/ConcurrentCircularQueue.cs b/tools/ReadConsoleInputStream/ConcurrentCircularQueue.cs new file mode 100644 index 000000000..db14019f3 --- /dev/null +++ b/tools/ReadConsoleInputStream/ConcurrentCircularQueue.cs @@ -0,0 +1,62 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; + +namespace Nivot.Terminal +{ + /// + /// Implements a circular buffer. + /// + /// + public class ConcurrentCircularQueue : ConcurrentQueue + { + public ConcurrentCircularQueue(int capacity) + { + Capacity = GetAlignedCapacity(capacity); + } + + /// + /// + /// + /// + /// + public ConcurrentCircularQueue(IEnumerable collection, int capacity) : base(collection) + { + Capacity = GetAlignedCapacity(capacity); + } + + private int GetAlignedCapacity(int n) + { + if (n < 2) + { + throw new ArgumentException("Capacity must be at least 2"); + } + + var f = Math.Log(n, 2); + var p = Math.Ceiling(f); + + return (int) Math.Pow(2, p); + } + + public new void Enqueue(T item) + { + if (Count >= Capacity) + { + lock (this) + { + while (Count >= Capacity) + { + TryDequeue(out _); + } + } + } + + base.Enqueue(item); + } + + public int Capacity + { + get; private set; + } + } +} diff --git a/tools/ReadConsoleInputStream/NativeMethods.cs b/tools/ReadConsoleInputStream/NativeMethods.cs new file mode 100644 index 000000000..f08640f02 --- /dev/null +++ b/tools/ReadConsoleInputStream/NativeMethods.cs @@ -0,0 +1,22 @@ +using System; +using System.Runtime.InteropServices; + +namespace Nivot.Terminal +{ + internal static class NativeMethods + { + private static int MakeHRFromErrorCode(int errorCode) + { + // Don't convert it if it is already an HRESULT + if ((0xFFFF0000 & errorCode) != 0) + return errorCode; + + return unchecked(((int)0x80070000) | errorCode); + } + + internal static Exception GetExceptionForWin32Error(int errorCode) + { + return Marshal.GetExceptionForHR(MakeHRFromErrorCode(errorCode)); + } + } +} \ No newline at end of file diff --git a/tools/ReadConsoleInputStream/Program.cs b/tools/ReadConsoleInputStream/Program.cs new file mode 100644 index 000000000..d4f97f3ee --- /dev/null +++ b/tools/ReadConsoleInputStream/Program.cs @@ -0,0 +1,151 @@ +/* + * This is a demo that shows how we can have a stream-oriented view of characters from the console + * while also listening to console events like mouse, menu, focus, buffer/viewport(1) resize events. + * + * This has always been tricky to do because ReadConsoleW/A doesn't allow retrieving events. + * Only ReadConsoleInputW/A returns events, but isn't stream-oriented. Using both doesn't work because + * ReadConsoleW/A flushes the input queue, meaning calls to ReadConsoleInputW/A will wait forever. + * + * I do this by deriving a new Stream class which wraps ReadConsoleInputW and accepts a provider/consumer + * implementation of BlockingCollection. This allows asynchronous monitoring of + * console events while simultaneously streaming the character input. I also use Mark Gravell's great + * System.IO.Pipelines utility classes (2) and David Hall's excellent P/Invoke wrappers (3) to make this + * demo cleaner to read; both are pulled from NuGet. + * + * (1) in versions of windows 10 prior to 1809, the buffer resize event only fires for enlarging + * the viewport, as this would cause the buffer to be enlarged too. Now it fires even when + * shrinking the viewport, which won't change the buffer size. + * + * (2) https://github.com/mgravell/Pipelines.Sockets.Unofficial + * https://www.nuget.org/packages/Pipelines.Sockets.Unofficial + * + * (3) https://github.com/dahall/Vanara + * https://www.nuget.org/packages/Vanara.Pinvoke.Kernel32 + * + * Oisin Grehan - 2019/4/21 + * + * https://twitter.com/oising + * https://github.com/oising + */ + +using System; +using System.Collections.Concurrent; +using System.IO; +using System.Runtime.InteropServices; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +using Pipelines.Sockets.Unofficial; +using Vanara.PInvoke; + +namespace Nivot.Terminal +{ + internal class Program + { + private static async Task Main(string[] args) + { + // run for 90 seconds + const int timeout = 90000; + + var source = new CancellationTokenSource(timeout); + var token = source.Token; + var handle = Kernel32.GetStdHandle(Kernel32.StdHandleType.STD_INPUT_HANDLE); + + if (!Kernel32.GetConsoleMode(handle, out Kernel32.CONSOLE_INPUT_MODE mode)) + throw NativeMethods.GetExceptionForWin32Error(Marshal.GetLastWin32Error()); + + // enable VT sequences so cursor movement etc is encapsulated in the stream + mode |= Kernel32.CONSOLE_INPUT_MODE.ENABLE_WINDOW_INPUT; + mode |= Kernel32.CONSOLE_INPUT_MODE.ENABLE_VIRTUAL_TERMINAL_INPUT; + mode &= ~Kernel32.CONSOLE_INPUT_MODE.ENABLE_ECHO_INPUT; + mode &= ~Kernel32.CONSOLE_INPUT_MODE.ENABLE_LINE_INPUT; + + if (!Kernel32.SetConsoleMode(handle, mode)) + throw NativeMethods.GetExceptionForWin32Error(Marshal.GetLastWin32Error()); + + // set utf-8 cp + if (!Kernel32.SetConsoleCP(65001)) + throw NativeMethods.GetExceptionForWin32Error(Marshal.GetLastWin32Error()); + + if (!Kernel32.SetConsoleOutputCP(65001)) + throw NativeMethods.GetExceptionForWin32Error(Marshal.GetLastWin32Error()); + + // base our provider/consumer on a circular buffer to keep memory usage under control + var events = new BlockingCollection( + new ConcurrentCircularQueue(256)); + + // Task that will consume non-key events asynchronously + var consumeEvents = Task.Run(() => + { + Console.WriteLine("consumeEvents started"); + + try + { + while (!events.IsCompleted) + { + // blocking call + var record = events.Take(token); + + Console.WriteLine("record: {0}", + Enum.GetName(typeof(Kernel32.EVENT_TYPE), record.EventType)); + } + } + catch (OperationCanceledException) + { + // timeout + } + + Console.WriteLine("consumeEvents ended"); + }, token); + + // Task that will watch for key events while feeding non-key events into our provider/consumer collection + var readInputAndProduceEvents = Task.Run(async () => + { + //So, this is the key point - we cannot use the following or we lose all non-key events: + // Stream stdin = Console.OpenStandardInput(); + + // get a unicode character stream over console input + Stream stdin = new ReadConsoleInputStream(handle, events); + + // wrap in a System.IO.Pipelines.PipeReader to get clean async and span/memory usage + var reader = StreamConnection.GetReader(stdin); + + while (!token.IsCancellationRequested) + { + // blocking call + var result = await reader.ReadAsync(token); + + if (result.IsCanceled) + break; + + var sequence = result.Buffer; + var segment = sequence.Start; + + while (sequence.TryGet(ref segment, out var mem)) + { + // decode back from unicode (2 bytes per char) + var datum = Encoding.Unicode.GetString(mem.Span); + Console.Write(datum); + } + + reader.AdvanceTo(sequence.End); + } + }, token); + + Console.WriteLine("Running"); + + try + { + await Task.WhenAll(consumeEvents, readInputAndProduceEvents); + } + catch (OperationCanceledException) + { + // timeout + } + + Console.WriteLine("press any key..."); + Console.ReadKey(true); + } + } +} \ No newline at end of file diff --git a/tools/ReadConsoleInputStream/ReadConsoleInputStream.cs b/tools/ReadConsoleInputStream/ReadConsoleInputStream.cs new file mode 100644 index 000000000..64a99f737 --- /dev/null +++ b/tools/ReadConsoleInputStream/ReadConsoleInputStream.cs @@ -0,0 +1,194 @@ +using System; +using System.Collections.Concurrent; +using System.Diagnostics; +using System.IO; + +using Vanara.PInvoke; + +namespace Nivot.Terminal +{ + /// + /// Provides a Stream-oriented view over the console's input buffer key events + /// while also collecting out of band events like buffer resize, menu etc in + /// a caller-provided BlockingCollection. + /// + /// The buffer contains unicode chars, not 8 bit CP encoded chars as we rely on ReadConsoleInputW. + public sealed class ReadConsoleInputStream : Stream + { + private const int BufferSize = 256; + private const int BytesPerWChar = 2; + private readonly BlockingCollection _nonKeyEvents; + private IntPtr _handle; + + /// + /// Creates an instance of ReadConsoleInputStream over the standard handle for StdIn. + /// + /// A BlockingCollection provider/consumer collection for collecting non key events. + public ReadConsoleInputStream(BlockingCollection nonKeyEvents) : + this(Kernel32.GetStdHandle(Kernel32.StdHandleType.STD_INPUT_HANDLE), nonKeyEvents) + { + } + + /// + /// Creates an instance of ReadConsoleInputStream over a caller-provided standard handle for stdin. + /// + /// A HFILE handle representing StdIn + /// A BlockingCollection provider/consumer collection for collecting non key events. + internal ReadConsoleInputStream(HFILE handle, + BlockingCollection nonKeyEvents) + { + Debug.Assert(handle.IsInvalid == false, "handle.IsInvalid == false"); + + _handle = handle.DangerousGetHandle(); + _nonKeyEvents = nonKeyEvents; + } + + public override bool CanRead { get; } = true; + + public override bool CanWrite => false; + + public override bool CanSeek => false; + + public override long Length => throw new NotSupportedException("Seek not supported."); + + public override long Position + { + get => throw new NotSupportedException("Seek not supported."); + set => throw new NotSupportedException("Seek not supported."); + } + + protected override void Dispose(bool disposing) + { + _handle = IntPtr.Zero; + + base.Dispose(disposing); + } + + public override int Read(byte[] buffer, int offset, int count) + { + ValidateRead(buffer, offset, count); + + Debug.Assert(offset >= 0, "offset >= 0"); + Debug.Assert(count >= 0, "count >= 0"); + Debug.Assert(buffer != null, "bytes != null"); + + // Don't corrupt memory when multiple threads are erroneously writing + // to this stream simultaneously. + if (buffer.Length - offset < count) + throw new IndexOutOfRangeException("IndexOutOfRange_IORaceCondition"); + + int bytesRead; + int ret; + + if (buffer.Length == 0) + { + bytesRead = 0; + ret = Win32Error.ERROR_SUCCESS; + } + else + { + var charsRead = 0; + bytesRead = 0; + + var records = new Kernel32.INPUT_RECORD[BufferSize]; + + // begin input loop + waitForInput: + + var readSuccess = Kernel32.ReadConsoleInput(_handle, records, 256, out var recordsRead); + Debug.WriteLine("Read {0} input record(s)", recordsRead); + + if (readSuccess && recordsRead > 0) + { + for (var index = 0; index < recordsRead; index++) + { + var record = records[index]; + + if (record.EventType == Kernel32.EVENT_TYPE.KEY_EVENT) + { + // skip key up events - if not, every key will be duped in the stream + if (record.Event.KeyEvent.bKeyDown == false) continue; + + // pack ucs-2/utf-16le/unicode chars into position in our byte[] buffer. + var glyph = (ushort) record.Event.KeyEvent.uChar; + + var lsb = (byte) (glyph & 0xFFu); + var msb = (byte) ((glyph >> 8) & 0xFFu); + + // ensure we accommodate key repeat counts + for (var n = 0; n < record.Event.KeyEvent.wRepeatCount; n++) + { + buffer[offset + charsRead * BytesPerWChar] = lsb; + buffer[offset + charsRead * BytesPerWChar + 1] = msb; + + charsRead++; + } + } + else + { + // ignore focus events (not doing so makes debugging absolutely hilarious) + if (record.EventType != Kernel32.EVENT_TYPE.FOCUS_EVENT) + { + // I assume success adding records - this is not so critical + // if it is critical to you, loop on this with a miniscule delay + _nonKeyEvents.TryAdd(record); + } + } + } + + bytesRead = charsRead * BytesPerWChar; + + // we should continue to block if no chars read (KEY_EVENT) + // even though non-key events were dispatched + if (bytesRead == 0) goto waitForInput; + } + else + { + Debug.Assert(bytesRead == 0, "bytesRead == 0"); + } + + Debug.WriteLine("Read {0} character(s)", charsRead); + ret = Win32Error.ERROR_SUCCESS; + } + + var errCode = ret; + if (Win32Error.ERROR_SUCCESS != errCode) + throw NativeMethods.GetExceptionForWin32Error(errCode); + + return bytesRead; + } + + public override void Write(byte[] buffer, int offset, int count) + { + throw new NotImplementedException("Write operations not implemented."); + } + + public override void Flush() + { + throw new NotSupportedException("Flush/Write not supported."); + } + + public override void SetLength(long value) + { + throw new NotSupportedException("Seek not supported."); + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotSupportedException("Seek not supported."); + } + + private void ValidateRead(byte[] buffer, int offset, int count) + { + if (buffer == null) + throw new ArgumentNullException(nameof(buffer)); + if (offset < 0 || count < 0) + throw new ArgumentOutOfRangeException(offset < 0 ? nameof(offset) : nameof(count), + "offset or count cannot be negative numbers."); + if (buffer.Length - offset < count) + throw new ArgumentException("invalid offset length."); + + if (!CanRead) throw new NotSupportedException("Get read not supported."); + } + } +} \ No newline at end of file diff --git a/tools/ReadConsoleInputStream/ReadConsoleInputStreamDemo.csproj b/tools/ReadConsoleInputStream/ReadConsoleInputStreamDemo.csproj new file mode 100644 index 000000000..e6b445ee5 --- /dev/null +++ b/tools/ReadConsoleInputStream/ReadConsoleInputStreamDemo.csproj @@ -0,0 +1,16 @@ + + + + Exe + netcoreapp2.2 + latest + true + Nivot.Terminal + + + + + + + +