Merge pull request #414 from oising/readconsoleinputstream-demo
ReadConsoleInputStream demo
This commit is contained in:
commit
2e0fd58bc5
58
tools/ReadConsoleInputStream/ConcurrentBoundedQueue.cs
Normal file
58
tools/ReadConsoleInputStream/ConcurrentBoundedQueue.cs
Normal file
|
@ -0,0 +1,58 @@
|
|||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
|
||||
namespace Samples.Terminal
|
||||
{
|
||||
/// <summary>
|
||||
/// Implements a bounded queue that won't block on overflow; instead the oldest item is discarded.
|
||||
/// </summary>
|
||||
/// <typeparam name="T"></typeparam>
|
||||
public class ConcurrentBoundedQueue<T> : ConcurrentQueue<T>
|
||||
{
|
||||
public ConcurrentBoundedQueue(int capacity)
|
||||
{
|
||||
Capacity = GetAlignedCapacity(capacity);
|
||||
}
|
||||
|
||||
public ConcurrentBoundedQueue(IEnumerable<T> collection, int capacity) : base(collection)
|
||||
{
|
||||
Capacity = GetAlignedCapacity(capacity);
|
||||
}
|
||||
|
||||
private int GetAlignedCapacity(int n)
|
||||
{
|
||||
if (n < 2)
|
||||
{
|
||||
throw new ArgumentException("Capacity must be at least 2");
|
||||
}
|
||||
|
||||
var f = Math.Log(n, 2);
|
||||
var p = Math.Ceiling(f);
|
||||
|
||||
return (int) Math.Pow(2, p);
|
||||
}
|
||||
|
||||
public new void Enqueue(T item)
|
||||
{
|
||||
// if we're about to overflow, dump oldest item
|
||||
if (Count >= Capacity)
|
||||
{
|
||||
lock (this)
|
||||
{
|
||||
while (Count >= Capacity)
|
||||
{
|
||||
TryDequeue(out _);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
base.Enqueue(item);
|
||||
}
|
||||
|
||||
public int Capacity
|
||||
{
|
||||
get; private set;
|
||||
}
|
||||
}
|
||||
}
|
31
tools/ReadConsoleInputStream/NativeMethods.cs
Normal file
31
tools/ReadConsoleInputStream/NativeMethods.cs
Normal file
|
@ -0,0 +1,31 @@
|
|||
using System;
|
||||
using System.Diagnostics;
|
||||
using System.Runtime.InteropServices;
|
||||
|
||||
namespace Samples.Terminal
|
||||
{
|
||||
internal static class NativeMethods
|
||||
{
|
||||
private static int MakeHRFromErrorCode(int errorCode)
|
||||
{
|
||||
// Don't convert it if it is already an HRESULT
|
||||
if ((0xFFFF0000 & errorCode) != 0)
|
||||
{
|
||||
Debug.Assert(false, "errorCode is already HRESULT");
|
||||
return errorCode;
|
||||
}
|
||||
|
||||
return unchecked(((int)0x80070000) | errorCode);
|
||||
}
|
||||
|
||||
internal static Exception GetExceptionForWin32Error(int errorCode)
|
||||
{
|
||||
return Marshal.GetExceptionForHR(MakeHRFromErrorCode(errorCode));
|
||||
}
|
||||
|
||||
internal static Exception GetExceptionForLastWin32Error()
|
||||
{
|
||||
return GetExceptionForWin32Error(Marshal.GetLastWin32Error());
|
||||
}
|
||||
}
|
||||
}
|
147
tools/ReadConsoleInputStream/Program.cs
Normal file
147
tools/ReadConsoleInputStream/Program.cs
Normal file
|
@ -0,0 +1,147 @@
|
|||
/*
|
||||
* This is a demo that shows how we can have a stream-oriented view of characters from the console
|
||||
* while also listening to console events like mouse, menu, focus, buffer/viewport(1) resize events.
|
||||
*
|
||||
* This has always been tricky to do because ReadConsoleW/A doesn't allow retrieving events.
|
||||
* Only ReadConsoleInputW/A returns events, but isn't stream-oriented. Using both doesn't work because
|
||||
* ReadConsoleW/A flushes the input queue, meaning calls to ReadConsoleInputW/A will wait forever.
|
||||
*
|
||||
* I do this by deriving a new Stream class which wraps ReadConsoleInputW and accepts a provider/consumer
|
||||
* implementation of BlockingCollection<Kernel32.INPUT_RECORD>. This allows asynchronous monitoring of
|
||||
* console events while simultaneously streaming the character input. I also use Mark Gravell's great
|
||||
* System.IO.Pipelines utility classes (2) and David Hall's excellent P/Invoke wrappers (3) to make this
|
||||
* demo cleaner to read; both are pulled from NuGet.
|
||||
*
|
||||
* (1) in versions of windows 10 prior to 1809, the buffer resize event only fires for enlarging
|
||||
* the viewport, as this would cause the buffer to be enlarged too. Now it fires even when
|
||||
* shrinking the viewport, which won't change the buffer size.
|
||||
*
|
||||
* (2) https://github.com/mgravell/Pipelines.Sockets.Unofficial
|
||||
* https://www.nuget.org/packages/Pipelines.Sockets.Unofficial
|
||||
*
|
||||
* (3) https://github.com/dahall/Vanara
|
||||
* https://www.nuget.org/packages/Vanara.Pinvoke.Kernel32
|
||||
*
|
||||
* Oisin Grehan - 2019/4/21
|
||||
*
|
||||
* https://twitter.com/oising
|
||||
* https://github.com/oising
|
||||
*/
|
||||
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.IO;
|
||||
using System.Runtime.InteropServices;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
using Pipelines.Sockets.Unofficial;
|
||||
using Vanara.PInvoke;
|
||||
|
||||
namespace Samples.Terminal
|
||||
{
|
||||
internal class Program
|
||||
{
|
||||
private static async Task Main(string[] args)
|
||||
{
|
||||
// run for 90 seconds
|
||||
var timeout = TimeSpan.FromSeconds(90);
|
||||
|
||||
// in reality this will likely never be reached, but it is useful to guard against
|
||||
// conditions where the queue isn't drained, or not drained fast enough.
|
||||
const int maxNonKeyEventRetention = 128;
|
||||
|
||||
var source = new CancellationTokenSource(timeout);
|
||||
var token = source.Token;
|
||||
var handle = Kernel32.GetStdHandle(Kernel32.StdHandleType.STD_INPUT_HANDLE);
|
||||
|
||||
if (!Kernel32.GetConsoleMode(handle, out Kernel32.CONSOLE_INPUT_MODE mode))
|
||||
throw NativeMethods.GetExceptionForWin32Error(Marshal.GetLastWin32Error());
|
||||
|
||||
mode |= Kernel32.CONSOLE_INPUT_MODE.ENABLE_WINDOW_INPUT;
|
||||
mode |= Kernel32.CONSOLE_INPUT_MODE.ENABLE_VIRTUAL_TERMINAL_INPUT;
|
||||
mode &= ~Kernel32.CONSOLE_INPUT_MODE.ENABLE_ECHO_INPUT;
|
||||
mode &= ~Kernel32.CONSOLE_INPUT_MODE.ENABLE_LINE_INPUT;
|
||||
|
||||
if (!Kernel32.SetConsoleMode(handle, mode))
|
||||
throw NativeMethods.GetExceptionForLastWin32Error();
|
||||
|
||||
// base our provider/consumer on a bounded queue to keep memory usage under control
|
||||
var events = new BlockingCollection<Kernel32.INPUT_RECORD>(
|
||||
new ConcurrentBoundedQueue<Kernel32.INPUT_RECORD>(maxNonKeyEventRetention));
|
||||
|
||||
// Task that will consume non-key events asynchronously
|
||||
var consumeEvents = Task.Run(() =>
|
||||
{
|
||||
Console.WriteLine("consumeEvents started");
|
||||
|
||||
try
|
||||
{
|
||||
while (!events.IsCompleted)
|
||||
{
|
||||
// blocking call
|
||||
var record = events.Take(token);
|
||||
|
||||
Console.WriteLine("record: {0}",
|
||||
Enum.GetName(typeof(Kernel32.EVENT_TYPE), record.EventType));
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// timeout
|
||||
}
|
||||
|
||||
Console.WriteLine("consumeEvents ended");
|
||||
}, token);
|
||||
|
||||
// Task that will watch for key events while feeding non-key events into our provider/consumer collection
|
||||
var readInputAndProduceEvents = Task.Run(async () =>
|
||||
{
|
||||
//So, this is the key point - we cannot use the following or we lose all non-key events:
|
||||
// Stream stdin = Console.OpenStandardInput();
|
||||
|
||||
// get a unicode character stream over console input
|
||||
Stream stdin = new ReadConsoleInputStream(handle, events);
|
||||
|
||||
// wrap in a System.IO.Pipelines.PipeReader to get clean async and span/memory usage
|
||||
var reader = StreamConnection.GetReader(stdin);
|
||||
|
||||
while (!token.IsCancellationRequested)
|
||||
{
|
||||
// blocking call
|
||||
var result = await reader.ReadAsync(token);
|
||||
|
||||
if (result.IsCanceled)
|
||||
break;
|
||||
|
||||
var sequence = result.Buffer;
|
||||
var segment = sequence.Start;
|
||||
|
||||
while (sequence.TryGet(ref segment, out var mem))
|
||||
{
|
||||
// decode back from unicode
|
||||
var datum = Encoding.Unicode.GetString(mem.Span);
|
||||
Console.Write(datum);
|
||||
}
|
||||
|
||||
reader.AdvanceTo(sequence.End);
|
||||
}
|
||||
}, token);
|
||||
|
||||
Console.WriteLine("Running");
|
||||
|
||||
try
|
||||
{
|
||||
await Task.WhenAll(consumeEvents, readInputAndProduceEvents);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// timeout
|
||||
}
|
||||
|
||||
Console.WriteLine("press any key...");
|
||||
Console.ReadKey(true);
|
||||
}
|
||||
}
|
||||
}
|
198
tools/ReadConsoleInputStream/ReadConsoleInputStream.cs
Normal file
198
tools/ReadConsoleInputStream/ReadConsoleInputStream.cs
Normal file
|
@ -0,0 +1,198 @@
|
|||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Diagnostics;
|
||||
using System.IO;
|
||||
|
||||
using Vanara.PInvoke;
|
||||
|
||||
namespace Samples.Terminal
|
||||
{
|
||||
/// <summary>
|
||||
/// Provides a Stream-oriented view over the console's input buffer key events
|
||||
/// while also collecting out of band events like buffer resize, menu etc in
|
||||
/// a caller-provided BlockingCollection.
|
||||
/// </summary>
|
||||
/// <remarks>The buffer contains unicode chars, not 8 bit CP encoded chars as we rely on ReadConsoleInputW.</remarks>
|
||||
public sealed class ReadConsoleInputStream : Stream
|
||||
{
|
||||
private const int BufferSize = 256;
|
||||
private const int BytesPerWChar = 2;
|
||||
private readonly BlockingCollection<Kernel32.INPUT_RECORD> _nonKeyEvents;
|
||||
private IntPtr _handle;
|
||||
|
||||
/// <summary>
|
||||
/// Creates an instance of ReadConsoleInputStream over the standard handle for StdIn.
|
||||
/// </summary>
|
||||
/// <param name="nonKeyEvents">A BlockingCollection provider/consumer collection for collecting non key events.</param>
|
||||
public ReadConsoleInputStream(BlockingCollection<Kernel32.INPUT_RECORD> nonKeyEvents) :
|
||||
this(Kernel32.GetStdHandle(Kernel32.StdHandleType.STD_INPUT_HANDLE), nonKeyEvents)
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates an instance of ReadConsoleInputStream over a caller-provided standard handle for stdin.
|
||||
/// </summary>
|
||||
/// <param name="handle">A HFILE handle representing StdIn</param>
|
||||
/// <param name="nonKeyEvents">A BlockingCollection provider/consumer collection for collecting non key events.</param>
|
||||
internal ReadConsoleInputStream(HFILE handle,
|
||||
BlockingCollection<Kernel32.INPUT_RECORD> nonKeyEvents)
|
||||
{
|
||||
Debug.Assert(handle.IsInvalid == false, "handle.IsInvalid == false");
|
||||
|
||||
_handle = handle.DangerousGetHandle();
|
||||
_nonKeyEvents = nonKeyEvents;
|
||||
}
|
||||
|
||||
public override bool CanRead { get; } = true;
|
||||
|
||||
public override bool CanWrite => false;
|
||||
|
||||
public override bool CanSeek => false;
|
||||
|
||||
public override long Length => throw new NotSupportedException("Seek not supported.");
|
||||
|
||||
public override long Position
|
||||
{
|
||||
get => throw new NotSupportedException("Seek not supported.");
|
||||
set => throw new NotSupportedException("Seek not supported.");
|
||||
}
|
||||
|
||||
protected override void Dispose(bool disposing)
|
||||
{
|
||||
_handle = IntPtr.Zero;
|
||||
|
||||
base.Dispose(disposing);
|
||||
}
|
||||
|
||||
public override int Read(byte[] buffer, int offset, int count)
|
||||
{
|
||||
ValidateRead(buffer, offset, count);
|
||||
|
||||
Debug.Assert(offset >= 0, "offset >= 0");
|
||||
Debug.Assert(count >= 0, "count >= 0");
|
||||
Debug.Assert(buffer != null, "bytes != null");
|
||||
|
||||
// Don't corrupt memory when multiple threads are erroneously writing
|
||||
// to this stream simultaneously.
|
||||
if (buffer.Length - offset < count)
|
||||
throw new IndexOutOfRangeException("IndexOutOfRange_IORaceCondition");
|
||||
|
||||
int bytesRead;
|
||||
int ret;
|
||||
|
||||
if (buffer.Length == 0)
|
||||
{
|
||||
bytesRead = 0;
|
||||
ret = Win32Error.ERROR_SUCCESS;
|
||||
}
|
||||
else
|
||||
{
|
||||
var charsRead = 0;
|
||||
bytesRead = 0;
|
||||
|
||||
var records = new Kernel32.INPUT_RECORD[BufferSize];
|
||||
|
||||
// begin input loop
|
||||
do
|
||||
{
|
||||
var readSuccess = Kernel32.ReadConsoleInput(_handle, records, BufferSize, out var recordsRead);
|
||||
Debug.WriteLine("Read {0} input record(s)", recordsRead);
|
||||
|
||||
// some of the arithmetic here is deliberately more explicit than it needs to be
|
||||
// in order to show how 16-bit unicode WCHARs are packed into the buffer. The console
|
||||
// subsystem is one of the last bastions of UCS-2, so until UTF-16 is fully adopted
|
||||
// the two-byte character assumptions below will hold.
|
||||
if (readSuccess && recordsRead > 0)
|
||||
{
|
||||
for (var index = 0; index < recordsRead; index++)
|
||||
{
|
||||
var record = records[index];
|
||||
|
||||
if (record.EventType == Kernel32.EVENT_TYPE.KEY_EVENT)
|
||||
{
|
||||
// skip key up events - if not, every key will be duped in the stream
|
||||
if (record.Event.KeyEvent.bKeyDown == false) continue;
|
||||
|
||||
// pack ucs-2/utf-16le/unicode chars into position in our byte[] buffer.
|
||||
var glyph = (ushort) record.Event.KeyEvent.uChar;
|
||||
|
||||
var lsb = (byte) (glyph & 0xFFu);
|
||||
var msb = (byte) ((glyph >> 8) & 0xFFu);
|
||||
|
||||
// ensure we accommodate key repeat counts
|
||||
for (var n = 0; n < record.Event.KeyEvent.wRepeatCount; n++)
|
||||
{
|
||||
buffer[offset + charsRead * BytesPerWChar] = lsb;
|
||||
buffer[offset + charsRead * BytesPerWChar + 1] = msb;
|
||||
|
||||
charsRead++;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// ignore focus events; not doing so makes debugging absolutely hilarious
|
||||
// when breakpoints repeatedly cause focus events to occur as your view toggles
|
||||
// between IDE and console.
|
||||
if (record.EventType != Kernel32.EVENT_TYPE.FOCUS_EVENT)
|
||||
{
|
||||
// I assume success adding records - this is not so critical
|
||||
// if it is critical to you, loop on this with a miniscule delay
|
||||
_nonKeyEvents.TryAdd(record);
|
||||
}
|
||||
}
|
||||
}
|
||||
bytesRead = charsRead * BytesPerWChar;
|
||||
}
|
||||
else
|
||||
{
|
||||
Debug.Assert(bytesRead == 0, "bytesRead == 0");
|
||||
}
|
||||
|
||||
} while (bytesRead == 0);
|
||||
|
||||
Debug.WriteLine("Read {0} character(s)", charsRead);
|
||||
|
||||
ret = Win32Error.ERROR_SUCCESS;
|
||||
}
|
||||
|
||||
var errCode = ret;
|
||||
if (Win32Error.ERROR_SUCCESS != errCode)
|
||||
throw NativeMethods.GetExceptionForWin32Error(errCode);
|
||||
|
||||
return bytesRead;
|
||||
}
|
||||
|
||||
public override void Write(byte[] buffer, int offset, int count)
|
||||
{
|
||||
throw new NotImplementedException("Write operations not implemented.");
|
||||
}
|
||||
|
||||
public override void Flush()
|
||||
{
|
||||
throw new NotSupportedException("Flush/Write not supported.");
|
||||
}
|
||||
|
||||
public override void SetLength(long value)
|
||||
{
|
||||
throw new NotSupportedException("Seek not supported.");
|
||||
}
|
||||
|
||||
public override long Seek(long offset, SeekOrigin origin)
|
||||
{
|
||||
throw new NotSupportedException("Seek not supported.");
|
||||
}
|
||||
|
||||
private void ValidateRead(byte[] buffer, int offset, int count)
|
||||
{
|
||||
if (buffer == null)
|
||||
throw new ArgumentNullException(nameof(buffer));
|
||||
if (offset < 0 || count < 0)
|
||||
throw new ArgumentOutOfRangeException(offset < 0 ? nameof(offset) : nameof(count),
|
||||
"offset or count cannot be negative numbers.");
|
||||
if (buffer.Length - offset < count)
|
||||
throw new ArgumentException("invalid offset length.");
|
||||
|
||||
if (!CanRead) throw new NotSupportedException("Get read not supported.");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,15 @@
|
|||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<OutputType>Exe</OutputType>
|
||||
<TargetFramework>netcoreapp2.2</TargetFramework>
|
||||
<LangVersion>latest</LangVersion>
|
||||
<RootNamespace>Samples.Terminal</RootNamespace>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Pipelines.Sockets.Unofficial" Version="2.0.22" />
|
||||
<PackageReference Include="Vanara.PInvoke.Kernel32" Version="2.3.6" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
25
tools/ReadConsoleInputStream/ReadConsoleInputStreamDemo.sln
Normal file
25
tools/ReadConsoleInputStream/ReadConsoleInputStreamDemo.sln
Normal file
|
@ -0,0 +1,25 @@
|
|||
|
||||
Microsoft Visual Studio Solution File, Format Version 12.00
|
||||
# Visual Studio Version 16
|
||||
VisualStudioVersion = 16.0.28803.156
|
||||
MinimumVisualStudioVersion = 10.0.40219.1
|
||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ReadConsoleInputStreamDemo", "ReadConsoleInputStreamDemo.csproj", "{62F500DE-4F06-4B46-B7AF-02AF21296F00}"
|
||||
EndProject
|
||||
Global
|
||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
||||
Debug|Any CPU = Debug|Any CPU
|
||||
Release|Any CPU = Release|Any CPU
|
||||
EndGlobalSection
|
||||
GlobalSection(ProjectConfigurationPlatforms) = postSolution
|
||||
{62F500DE-4F06-4B46-B7AF-02AF21296F00}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{62F500DE-4F06-4B46-B7AF-02AF21296F00}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{62F500DE-4F06-4B46-B7AF-02AF21296F00}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{62F500DE-4F06-4B46-B7AF-02AF21296F00}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
EndGlobalSection
|
||||
GlobalSection(SolutionProperties) = preSolution
|
||||
HideSolutionNode = FALSE
|
||||
EndGlobalSection
|
||||
GlobalSection(ExtensibilityGlobals) = postSolution
|
||||
SolutionGuid = {55A9793B-D717-4A6E-A8FE-ABC6CD3B17BA}
|
||||
EndGlobalSection
|
||||
EndGlobal
|
Loading…
Reference in a new issue