DelphiWebsockets/uROSimpleEventRepository.pas

138 lines
4.1 KiB
ObjectPascal

unit uROSimpleEventRepository;
interface
uses
uROEventRepository, uROClient, uROTypes, uROClientIntf,
uROHTTPWebsocketServer, uROSessions, Classes, SyncObjs;
type
TROSimpleWebsocketEventRepository = class(TInterfacedObject,
IROEventRepository)
private
FMessage: TROMessage;
FROServer: TROIndyHTTPWebsocketServer;
FEventCount: Integer;
protected
{IROEventRepository}
procedure AddSession(aSessionID : TGUID); overload;
procedure AddSession(aSessionID : TGUID; aEventSinkId: AnsiString); overload;
procedure RemoveSession(aSessionID : TGUID); overload;
procedure RemoveSession(aSessionID : TGUID; aEventSinkId: AnsiString); overload;
procedure StoreEventData(SourceSessionID : TGUID; Data : Binary;
const ExcludeSender: Boolean;
const ExcludeSessionList: Boolean;
const SessionList: String); overload;
procedure StoreEventData(SourceSessionID : TGUID; Data : Binary;
const ExcludeSender: Boolean;
const ExcludeSessionList: Boolean;
const SessionList: String;
const EventSinkId: AnsiString); overload;
function GetEventData(SessionID : TGUID; var TargetStream : Binary) : integer;
public
function GetEventWriter(const IID: TGUID): IROEventWriter;
property Message : TROMessage read FMessage write FMessage;
property ROServer: TROIndyHTTPWebsocketServer read FROServer write FROServer;
end;
implementation
uses
IdContext, IdIOHandlerWebsocket, Windows;
{ TSimpleEventRepository }
procedure TROSimpleWebsocketEventRepository.AddSession(aSessionID: TGUID);
begin
//no session
end;
procedure TROSimpleWebsocketEventRepository.AddSession(aSessionID: TGUID;
aEventSinkId: AnsiString);
begin
//no session
end;
procedure TROSimpleWebsocketEventRepository.RemoveSession(aSessionID: TGUID;
aEventSinkId: AnsiString);
begin
//no session
end;
procedure TROSimpleWebsocketEventRepository.RemoveSession(aSessionID: TGUID);
begin
//no session
end;
function TROSimpleWebsocketEventRepository.GetEventWriter(
const IID: TGUID): IROEventWriter;
var
lEventWriterClass: TROEventWriterClass;
begin
lEventWriterClass := FindEventWriterClass(IID);
if not assigned(lEventWriterClass) then exit;
result := lEventWriterClass.Create(fMessage, Self) as IROEventWriter;
end;
function TROSimpleWebsocketEventRepository.GetEventData(SessionID: TGUID;
var TargetStream: Binary): integer;
begin
Result := -1;
Assert(False);
end;
procedure TROSimpleWebsocketEventRepository.StoreEventData(SourceSessionID: TGUID;
Data: Binary; const ExcludeSender, ExcludeSessionList: Boolean;
const SessionList: String; const EventSinkId: AnsiString);
begin
StoreEventData(SourceSessionID, Data, ExcludeSender, ExcludeSessionList, SessionList);
end;
procedure TROSimpleWebsocketEventRepository.StoreEventData(SourceSessionID: TGUID;
Data: Binary; const ExcludeSender, ExcludeSessionList: Boolean;
const SessionList: String);
var
i, iEventNr: Integer;
LContext: TIdContext;
l: TList;
ws: TIdIOHandlerWebsocket;
begin
l := ROServer.IndyServer.Contexts.LockList;
try
if l.Count <= 0 then Exit;
iEventNr := -1 * InterlockedIncrement(FEventCount); //negative = event, positive is normal RO message
if iEventNr > 0 then
begin
InterlockedExchange(FEventCount, 0);
iEventNr := -1 * InterlockedIncrement(FEventCount); //negative = event, positive is normal RO message
end;
Assert(iEventNr < 0);
Data.Position := Data.Size;
Data.Write(C_ROWSNR, Length(C_ROWSNR));
Data.Write(iEventNr, SizeOf(iEventNr));
Data.Position := 0;
//direct write to ALL connections
for i := 0 to l.Count - 1 do
begin
LContext := TIdContext(l.Items[i]);
ws := (LContext.Connection.IOHandler as TIdIOHandlerWebsocket);
if not ws.IsWebsocket then Continue;
ws.Lock;
try
ws.Write(Data, wdtBinary);
finally
ws.Unlock;
end;
end;
finally
ROServer.IndyServer.Contexts.UnlockList;
end;
end;
end.