initial commit for tools/readconsoleinputstream PR

This commit is contained in:
oising 2019-04-21 13:31:36 -04:00
parent 5456666d35
commit cade139e0c
5 changed files with 445 additions and 0 deletions

View file

@ -0,0 +1,62 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
namespace Nivot.Terminal
{
/// <summary>
/// Implements a circular buffer.
/// </summary>
/// <typeparam name="T"></typeparam>
public class ConcurrentCircularQueue<T> : ConcurrentQueue<T>
{
public ConcurrentCircularQueue(int capacity)
{
Capacity = GetAlignedCapacity(capacity);
}
/// <summary>
///
/// </summary>
/// <param name="collection"></param>
/// <param name="capacity"></param>
public ConcurrentCircularQueue(IEnumerable<T> collection, int capacity) : base(collection)
{
Capacity = GetAlignedCapacity(capacity);
}
private int GetAlignedCapacity(int n)
{
if (n < 2)
{
throw new ArgumentException("Capacity must be at least 2");
}
var f = Math.Log(n, 2);
var p = Math.Ceiling(f);
return (int) Math.Pow(2, p);
}
public new void Enqueue(T item)
{
if (Count >= Capacity)
{
lock (this)
{
while (Count >= Capacity)
{
TryDequeue(out _);
}
}
}
base.Enqueue(item);
}
public int Capacity
{
get; private set;
}
}
}

View file

@ -0,0 +1,22 @@
using System;
using System.Runtime.InteropServices;
namespace Nivot.Terminal
{
internal static class NativeMethods
{
private static int MakeHRFromErrorCode(int errorCode)
{
// Don't convert it if it is already an HRESULT
if ((0xFFFF0000 & errorCode) != 0)
return errorCode;
return unchecked(((int)0x80070000) | errorCode);
}
internal static Exception GetExceptionForWin32Error(int errorCode)
{
return Marshal.GetExceptionForHR(MakeHRFromErrorCode(errorCode));
}
}
}

View file

@ -0,0 +1,151 @@
/*
* This is a demo that shows how we can have a stream-oriented view of characters from the console
* while also listening to console events like mouse, menu, focus, buffer/viewport(1) resize events.
*
* This has always been tricky to do because ReadConsoleW/A doesn't allow retrieving events.
* Only ReadConsoleInputW/A returns events, but isn't stream-oriented. Using both doesn't work because
* ReadConsoleW/A flushes the input queue, meaning calls to ReadConsoleInputW/A will wait forever.
*
* I do this by deriving a new Stream class which wraps ReadConsoleInputW and accepts a provider/consumer
* implementation of BlockingCollection<Kernel32.INPUT_RECORD>. This allows asynchronous monitoring of
* console events while simultaneously streaming the character input. I also use Mark Gravell's great
* System.IO.Pipelines utility classes (2) and David Hall's excellent P/Invoke wrappers (3) to make this
* demo cleaner to read; both are pulled from NuGet.
*
* (1) in versions of windows 10 prior to 1809, the buffer resize event only fires for enlarging
* the viewport, as this would cause the buffer to be enlarged too. Now it fires even when
* shrinking the viewport, which won't change the buffer size.
*
* (2) https://github.com/mgravell/Pipelines.Sockets.Unofficial
* https://www.nuget.org/packages/Pipelines.Sockets.Unofficial
*
* (3) https://github.com/dahall/Vanara
* https://www.nuget.org/packages/Vanara.Pinvoke.Kernel32
*
* Oisin Grehan - 2019/4/21
*
* https://twitter.com/oising
* https://github.com/oising
*/
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Pipelines.Sockets.Unofficial;
using Vanara.PInvoke;
namespace Nivot.Terminal
{
internal class Program
{
private static async Task Main(string[] args)
{
// run for 90 seconds
const int timeout = 90000;
var source = new CancellationTokenSource(timeout);
var token = source.Token;
var handle = Kernel32.GetStdHandle(Kernel32.StdHandleType.STD_INPUT_HANDLE);
if (!Kernel32.GetConsoleMode(handle, out Kernel32.CONSOLE_INPUT_MODE mode))
throw NativeMethods.GetExceptionForWin32Error(Marshal.GetLastWin32Error());
// enable VT sequences so cursor movement etc is encapsulated in the stream
mode |= Kernel32.CONSOLE_INPUT_MODE.ENABLE_WINDOW_INPUT;
mode |= Kernel32.CONSOLE_INPUT_MODE.ENABLE_VIRTUAL_TERMINAL_INPUT;
mode &= ~Kernel32.CONSOLE_INPUT_MODE.ENABLE_ECHO_INPUT;
mode &= ~Kernel32.CONSOLE_INPUT_MODE.ENABLE_LINE_INPUT;
if (!Kernel32.SetConsoleMode(handle, mode))
throw NativeMethods.GetExceptionForWin32Error(Marshal.GetLastWin32Error());
// set utf-8 cp
if (!Kernel32.SetConsoleCP(65001))
throw NativeMethods.GetExceptionForWin32Error(Marshal.GetLastWin32Error());
if (!Kernel32.SetConsoleOutputCP(65001))
throw NativeMethods.GetExceptionForWin32Error(Marshal.GetLastWin32Error());
// base our provider/consumer on a circular buffer to keep memory usage under control
var events = new BlockingCollection<Kernel32.INPUT_RECORD>(
new ConcurrentCircularQueue<Kernel32.INPUT_RECORD>(256));
// Task that will consume non-key events asynchronously
var consumeEvents = Task.Run(() =>
{
Console.WriteLine("consumeEvents started");
try
{
while (!events.IsCompleted)
{
// blocking call
var record = events.Take(token);
Console.WriteLine("record: {0}",
Enum.GetName(typeof(Kernel32.EVENT_TYPE), record.EventType));
}
}
catch (OperationCanceledException)
{
// timeout
}
Console.WriteLine("consumeEvents ended");
}, token);
// Task that will watch for key events while feeding non-key events into our provider/consumer collection
var readInputAndProduceEvents = Task.Run(async () =>
{
//So, this is the key point - we cannot use the following or we lose all non-key events:
// Stream stdin = Console.OpenStandardInput();
// get a unicode character stream over console input
Stream stdin = new ReadConsoleInputStream(handle, events);
// wrap in a System.IO.Pipelines.PipeReader to get clean async and span/memory usage
var reader = StreamConnection.GetReader(stdin);
while (!token.IsCancellationRequested)
{
// blocking call
var result = await reader.ReadAsync(token);
if (result.IsCanceled)
break;
var sequence = result.Buffer;
var segment = sequence.Start;
while (sequence.TryGet(ref segment, out var mem))
{
// decode back from unicode (2 bytes per char)
var datum = Encoding.Unicode.GetString(mem.Span);
Console.Write(datum);
}
reader.AdvanceTo(sequence.End);
}
}, token);
Console.WriteLine("Running");
try
{
await Task.WhenAll(consumeEvents, readInputAndProduceEvents);
}
catch (OperationCanceledException)
{
// timeout
}
Console.WriteLine("press any key...");
Console.ReadKey(true);
}
}
}

View file

@ -0,0 +1,194 @@
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.IO;
using Vanara.PInvoke;
namespace Nivot.Terminal
{
/// <summary>
/// Provides a Stream-oriented view over the console's input buffer key events
/// while also collecting out of band events like buffer resize, menu etc in
/// a caller-provided BlockingCollection.
/// </summary>
/// <remarks>The buffer contains unicode chars, not 8 bit CP encoded chars as we rely on ReadConsoleInputW.</remarks>
public sealed class ReadConsoleInputStream : Stream
{
private const int BufferSize = 256;
private const int BytesPerWChar = 2;
private readonly BlockingCollection<Kernel32.INPUT_RECORD> _nonKeyEvents;
private IntPtr _handle;
/// <summary>
/// Creates an instance of ReadConsoleInputStream over the standard handle for StdIn.
/// </summary>
/// <param name="nonKeyEvents">A BlockingCollection provider/consumer collection for collecting non key events.</param>
public ReadConsoleInputStream(BlockingCollection<Kernel32.INPUT_RECORD> nonKeyEvents) :
this(Kernel32.GetStdHandle(Kernel32.StdHandleType.STD_INPUT_HANDLE), nonKeyEvents)
{
}
/// <summary>
/// Creates an instance of ReadConsoleInputStream over a caller-provided standard handle for stdin.
/// </summary>
/// <param name="handle">A HFILE handle representing StdIn</param>
/// <param name="nonKeyEvents">A BlockingCollection provider/consumer collection for collecting non key events.</param>
internal ReadConsoleInputStream(HFILE handle,
BlockingCollection<Kernel32.INPUT_RECORD> nonKeyEvents)
{
Debug.Assert(handle.IsInvalid == false, "handle.IsInvalid == false");
_handle = handle.DangerousGetHandle();
_nonKeyEvents = nonKeyEvents;
}
public override bool CanRead { get; } = true;
public override bool CanWrite => false;
public override bool CanSeek => false;
public override long Length => throw new NotSupportedException("Seek not supported.");
public override long Position
{
get => throw new NotSupportedException("Seek not supported.");
set => throw new NotSupportedException("Seek not supported.");
}
protected override void Dispose(bool disposing)
{
_handle = IntPtr.Zero;
base.Dispose(disposing);
}
public override int Read(byte[] buffer, int offset, int count)
{
ValidateRead(buffer, offset, count);
Debug.Assert(offset >= 0, "offset >= 0");
Debug.Assert(count >= 0, "count >= 0");
Debug.Assert(buffer != null, "bytes != null");
// Don't corrupt memory when multiple threads are erroneously writing
// to this stream simultaneously.
if (buffer.Length - offset < count)
throw new IndexOutOfRangeException("IndexOutOfRange_IORaceCondition");
int bytesRead;
int ret;
if (buffer.Length == 0)
{
bytesRead = 0;
ret = Win32Error.ERROR_SUCCESS;
}
else
{
var charsRead = 0;
bytesRead = 0;
var records = new Kernel32.INPUT_RECORD[BufferSize];
// begin input loop
waitForInput:
var readSuccess = Kernel32.ReadConsoleInput(_handle, records, 256, out var recordsRead);
Debug.WriteLine("Read {0} input record(s)", recordsRead);
if (readSuccess && recordsRead > 0)
{
for (var index = 0; index < recordsRead; index++)
{
var record = records[index];
if (record.EventType == Kernel32.EVENT_TYPE.KEY_EVENT)
{
// skip key up events - if not, every key will be duped in the stream
if (record.Event.KeyEvent.bKeyDown == false) continue;
// pack ucs-2/utf-16le/unicode chars into position in our byte[] buffer.
var glyph = (ushort) record.Event.KeyEvent.uChar;
var lsb = (byte) (glyph & 0xFFu);
var msb = (byte) ((glyph >> 8) & 0xFFu);
// ensure we accommodate key repeat counts
for (var n = 0; n < record.Event.KeyEvent.wRepeatCount; n++)
{
buffer[offset + charsRead * BytesPerWChar] = lsb;
buffer[offset + charsRead * BytesPerWChar + 1] = msb;
charsRead++;
}
}
else
{
// ignore focus events (not doing so makes debugging absolutely hilarious)
if (record.EventType != Kernel32.EVENT_TYPE.FOCUS_EVENT)
{
// I assume success adding records - this is not so critical
// if it is critical to you, loop on this with a miniscule delay
_nonKeyEvents.TryAdd(record);
}
}
}
bytesRead = charsRead * BytesPerWChar;
// we should continue to block if no chars read (KEY_EVENT)
// even though non-key events were dispatched
if (bytesRead == 0) goto waitForInput;
}
else
{
Debug.Assert(bytesRead == 0, "bytesRead == 0");
}
Debug.WriteLine("Read {0} character(s)", charsRead);
ret = Win32Error.ERROR_SUCCESS;
}
var errCode = ret;
if (Win32Error.ERROR_SUCCESS != errCode)
throw NativeMethods.GetExceptionForWin32Error(errCode);
return bytesRead;
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotImplementedException("Write operations not implemented.");
}
public override void Flush()
{
throw new NotSupportedException("Flush/Write not supported.");
}
public override void SetLength(long value)
{
throw new NotSupportedException("Seek not supported.");
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException("Seek not supported.");
}
private void ValidateRead(byte[] buffer, int offset, int count)
{
if (buffer == null)
throw new ArgumentNullException(nameof(buffer));
if (offset < 0 || count < 0)
throw new ArgumentOutOfRangeException(offset < 0 ? nameof(offset) : nameof(count),
"offset or count cannot be negative numbers.");
if (buffer.Length - offset < count)
throw new ArgumentException("invalid offset length.");
if (!CanRead) throw new NotSupportedException("Get read not supported.");
}
}
}

View file

@ -0,0 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.2</TargetFramework>
<LangVersion>latest</LangVersion>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<RootNamespace>Nivot.Terminal</RootNamespace>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Pipelines.Sockets.Unofficial" Version="2.0.22" />
<PackageReference Include="Vanara.PInvoke.Kernel32" Version="2.3.6" />
</ItemGroup>
</Project>