From 7d3b78b227dd05c4b91433b3f9397af0b0a7595f Mon Sep 17 00:00:00 2001 From: andremussche Date: Mon, 11 Nov 2013 21:14:42 +0100 Subject: [PATCH] first checkin --- IdHTTPWebsocketClient.pas | 1238 +++++++++++++++++++++++++++++++ IdIOHandlerWebsocket.pas | 771 ++++++++++++++++++++ IdServerBaseHandling.pas | 11 + IdServerIOHandlerWebsocket.pas | 46 ++ IdServerSocketIOHandling.pas | 194 +++++ IdServerWebsocketContext.pas | 81 +++ IdServerWebsocketHandling.pas | 317 ++++++++ IdSocketIOHandling.pas | 1239 ++++++++++++++++++++++++++++++++ IdWebsocketServer.pas | 156 ++++ 9 files changed, 4053 insertions(+) create mode 100644 IdHTTPWebsocketClient.pas create mode 100644 IdIOHandlerWebsocket.pas create mode 100644 IdServerBaseHandling.pas create mode 100644 IdServerIOHandlerWebsocket.pas create mode 100644 IdServerSocketIOHandling.pas create mode 100644 IdServerWebsocketContext.pas create mode 100644 IdServerWebsocketHandling.pas create mode 100644 IdSocketIOHandling.pas create mode 100644 IdWebsocketServer.pas diff --git a/IdHTTPWebsocketClient.pas b/IdHTTPWebsocketClient.pas new file mode 100644 index 0000000..efb0c5e --- /dev/null +++ b/IdHTTPWebsocketClient.pas @@ -0,0 +1,1238 @@ +unit IdHTTPWebsocketClient; + +interface + +uses + Classes, + IdHTTP, IdHashSHA1, IdIOHandler, + IdIOHandlerWebsocket, ExtCtrls, IdWinsock2, Generics.Collections, SyncObjs, + IdSocketIOHandling; + +type + TDataBinEvent = procedure(const aData: TStream) of object; + TDataStringEvent = procedure(const aData: string) of object; + + TIdHTTPWebsocketClient = class; + TSocketIOMsg = procedure(const AClient: TIdHTTPWebsocketClient; const aText: string; aMsgNr: Integer) of object; + + TIdSocketIOHandling_Ext = class(TIdSocketIOHandling) + end; + + TIdHTTPWebsocketClient = class(TIdHTTP) + private + FWSResourceName: string; + FHash: TIdHashSHA1; + FOnData: TDataBinEvent; + FOnTextData: TDataStringEvent; + function GetIOHandlerWS: TIdIOHandlerWebsocket; + procedure SetIOHandlerWS(const Value: TIdIOHandlerWebsocket); + procedure SetOnData(const Value: TDataBinEvent); + procedure SetOnTextData(const Value: TDataStringEvent); + protected + FSocketIOCompatible: Boolean; + FSocketIOHandshakeResponse: string; + FSocketIO: TIdSocketIOHandling_Ext; + FSocketIOContext: ISocketIOContext; + FHeartBeat: TTimer; + procedure HeartBeatTimer(Sender: TObject); + function GetSocketIO: TIdSocketIOHandling; + protected + procedure InternalUpgradeToWebsocket(aRaiseException: Boolean; out aFailedReason: string);virtual; + function MakeImplicitClientHandler: TIdIOHandler; override; + public + procedure AsyncDispatchEvent(const aEvent: TStream); overload; virtual; + procedure AsyncDispatchEvent(const aEvent: string); overload; virtual; + procedure ResetChannel; + public + procedure AfterConstruction; override; + destructor Destroy; override; + + function TryUpgradeToWebsocket: Boolean; + procedure UpgradeToWebsocket; + procedure Disconnect(ANotifyPeer: Boolean); override; + + property IOHandler: TIdIOHandlerWebsocket read GetIOHandlerWS write SetIOHandlerWS; + property OnBinData : TDataBinEvent read FOnData write SetOnData; + property OnTextData: TDataStringEvent read FOnTextData write SetOnTextData; + + //https://github.com/LearnBoost/socket.io-spec + property SocketIOCompatible: Boolean read FSocketIOCompatible write FSocketIOCompatible; + property SocketIO: TIdSocketIOHandling read GetSocketIO; + published + property Host; + property Port; + property WSResourceName: string read FWSResourceName write FWSResourceName; + end; + +// on error + TIdHTTPSocketIOClient_old = class(TIdHTTPWebsocketClient) + private + FOnConnected: TNotifyEvent; + FOnDisConnected: TNotifyEvent; + FOnSocketIOMsg: TSocketIOMsg; + FOnSocketIOEvent: TSocketIOMsg; + FOnSocketIOJson: TSocketIOMsg; + protected + FHeartBeat: TTimer; + procedure HeartBeatTimer(Sender: TObject); + + procedure InternalUpgradeToWebsocket(aRaiseException: Boolean; out aFailedReason: string);override; + public + procedure AsyncDispatchEvent(const aEvent: string); override; + public + procedure AfterConstruction; override; + destructor Destroy; override; + + procedure AutoConnect; + + property SocketIOHandshakeResponse: string read FSocketIOHandshakeResponse; + property OnConnected: TNotifyEvent read FOnConnected write FOnConnected; + property OnDisConnected: TNotifyEvent read FOnDisConnected write FOnDisConnected; + +// procedure ProcessSocketIORequest(const strmRequest: TStream); + property OnSocketIOMsg : TSocketIOMsg read FOnSocketIOMsg write FOnSocketIOMsg; + property OnSocketIOJson : TSocketIOMsg read FOnSocketIOJson write FOnSocketIOJson; + property OnSocketIOEvent: TSocketIOMsg read FOnSocketIOEvent write FOnSocketIOEvent; + end; + + TIdWebsocketMultiReadThread = class(TThread) + private + class var FInstance: TIdWebsocketMultiReadThread; + protected + FTempHandle: THandle; + FPendingBreak: Boolean; + Freadset, Fexceptionset: TFDSet; + Finterval: TTimeVal; + procedure InitSpecialEventSocket; + procedure ResetSpecialEventSocket; + procedure BreakSelectWait; + protected + FChannels: TThreadList; + procedure ReadFromAllChannels; + + procedure Execute; override; + public + procedure AfterConstruction;override; + destructor Destroy; override; + + procedure Terminate; + + procedure AddClient (aChannel: TIdHTTPWebsocketClient); + procedure RemoveClient(aChannel: TIdHTTPWebsocketClient); + + class function Instance: TIdWebsocketMultiReadThread; + class procedure RemoveInstance; + end; + + //async post data + TIdWebsocketDispatchThread = class(TThread) + private + class var FInstance: TIdWebsocketDispatchThread; + protected + FEvent: TEvent; + FEvents, FProcessing: TList; + procedure Execute; override; + public + procedure AfterConstruction;override; + destructor Destroy; override; + + procedure Terminate; + + procedure QueueEvent(aEvent: TThreadProcedure); + + class function Instance: TIdWebsocketDispatchThread; + end; + +implementation + +uses + IdCoderMIME, SysUtils, Math, IdException, IdStackConsts, IdStack, + IdStackBSDBase, IdGlobal, Windows, StrUtils, mcBaseNamedThread, + mcFinalizationHelper; + +//type +// TAnonymousThread = class(TThread) +// protected +// FThreadProc: TThreadProcedure; +// procedure Execute; override; +// public +// constructor Create(AThreadProc: TThreadProcedure); +// end; + +//procedure CreateAnonymousThread(AThreadProc: TThreadProcedure); +//begin +// TAnonymousThread.Create(AThreadProc); +//end; + +{ TAnonymousThread } + +//constructor TAnonymousThread.Create(AThreadProc: TThreadProcedure); +//begin +// FThreadProc := AThreadProc; +// FreeOnTerminate := True; +// inherited Create(False {direct start}); +//end; +// +//procedure TAnonymousThread.Execute; +//begin +// if Assigned(FThreadProc) then +// FThreadProc(); +//end; + +{ TIdHTTPWebsocketClient } + +procedure TIdHTTPWebsocketClient.AfterConstruction; +begin + inherited; + FHash := TIdHashSHA1.Create; + + IOHandler := TIdIOHandlerWebsocket.Create(nil); + ManagedIOHandler := True; + + FSocketIO := TIdSocketIOHandling_Ext.Create; + FHeartBeat := TTimer.Create(nil); + FHeartBeat.Enabled := False; + FHeartBeat.OnTimer := HeartBeatTimer; +end; + +procedure TIdHTTPWebsocketClient.AsyncDispatchEvent(const aEvent: TStream); +var + strmevent: TMemoryStream; +begin + if not Assigned(OnBinData) then Exit; + + strmevent := TMemoryStream.Create; + strmevent.CopyFrom(aEvent, aEvent.Size); + + //events during dispatch? channel is busy so offload event dispatching to different thread! + TIdWebsocketDispatchThread.Instance.QueueEvent( + procedure + begin + if Assigned(OnBinData) then + OnBinData(strmevent); + strmevent.Free; + end); +end; + +procedure TIdHTTPWebsocketClient.AsyncDispatchEvent(const aEvent: string); +begin + if FSocketIOCompatible then + FSocketIO.ProcessSocketIORequest(FSocketIOContext as TSocketIOContext, aEvent) + else + begin + if not Assigned(OnTextData) then Exit; + //events during dispatch? channel is busy so offload event dispatching to different thread! + TIdWebsocketDispatchThread.Instance.QueueEvent( + procedure + begin + if Assigned(OnTextData) then + OnTextData(aEvent); + end); + end; +end; + +destructor TIdHTTPWebsocketClient.Destroy; +var tmr: TObject; +begin + tmr := FHeartBeat; + FHeartBeat := nil; + TThread.Queue(nil, //otherwise free in other thread than created + procedure + begin + //FHeartBeat.Free; + tmr.Free; + end); + + TIdWebsocketMultiReadThread.Instance.RemoveClient(Self); + FSocketIO.Free; + FHash.Free; + inherited; +end; + +procedure TIdHTTPWebsocketClient.DisConnect(ANotifyPeer: Boolean); +begin + TIdWebsocketMultiReadThread.Instance.RemoveClient(Self); + + if ANotifyPeer and SocketIOCompatible then + FSocketIO.WriteDisConnect(FSocketIOContext as TSocketIOContext) + else + FSocketIO.FreeConnection(FSocketIOContext as TSocketIOContext); +// IInterface(FSocketIOContext)._Release; + FSocketIOContext := nil; + + if IOHandler <> nil then + begin + IOHandler.Lock; + try + IOHandler.IsWebsocket := False; + + inherited DisConnect(ANotifyPeer); + //clear buffer, other still "connected" + IOHandler.InputBuffer.Clear; + + //IOHandler.Free; + //IOHandler := TIdIOHandlerWebsocket.Create(nil); + finally + IOHandler.Unlock; + end; + end; +end; + +function TIdHTTPWebsocketClient.GetIOHandlerWS: TIdIOHandlerWebsocket; +begin +// if inherited IOHandler is TIdIOHandlerWebsocket then + Result := inherited IOHandler as TIdIOHandlerWebsocket +// else +// Assert(False); +end; + +function TIdHTTPWebsocketClient.GetSocketIO: TIdSocketIOHandling; +begin + Result := FSocketIO; +end; + +procedure TIdHTTPWebsocketClient.HeartBeatTimer(Sender: TObject); +begin + FHeartBeat.Enabled := False; + FSocketIO.Lock; + try + try + if (IOHandler <> nil) and + not IOHandler.ClosedGracefully and + IOHandler.Connected and + (FSocketIOContext <> nil) then + begin + FSocketIO.WritePing(FSocketIOContext as TSocketIOContext); //heartbeat socket.io message + end + //retry re-connect + else + try + //clear inputbuffer, otherwise it can't connect :( + if (IOHandler <> nil) and + not IOHandler.InputBufferIsEmpty + then + IOHandler.DiscardAll; + + Self.Connect; + TryUpgradeToWebsocket; + except + //skip, just retried + end; + except + //clear inputbuffer, otherwise it stays connected :( + if (IOHandler <> nil) and + not IOHandler.InputBufferIsEmpty + then + IOHandler.DiscardAll; + + if Assigned(OnDisConnected) then + OnDisConnected(Self); + try + raise EIdException.Create('Connection lost from ' + Format('ws://%s:%d/%s', [Host, Port, WSResourceName])); + except + //eat, no error popup! + end; + end; + finally + FSocketIO.UnLock; + FHeartBeat.Enabled := True; //always enable: in case of disconnect it will re-connect + end; +end; + +function TIdHTTPWebsocketClient.TryUpgradeToWebsocket: Boolean; +var + sError: string; +begin + InternalUpgradeToWebsocket(False{no raise}, sError); + Result := (sError = ''); +end; + +procedure TIdHTTPWebsocketClient.UpgradeToWebsocket; +var + sError: string; +begin + InternalUpgradeToWebsocket(True{raise}, sError); +end; + +procedure TIdHTTPWebsocketClient.InternalUpgradeToWebsocket(aRaiseException: Boolean; out aFailedReason: string); +var + sURL: string; + strmResponse: TMemoryStream; + i: Integer; + sKey, sResponseKey: string; + sSocketioextended: string; +begin + Assert(not IOHandler.IsWebsocket); + + strmResponse := TMemoryStream.Create; + try + //special socket.io handling, see https://github.com/LearnBoost/socket.io-spec + if SocketIOCompatible then + begin + Request.Clear; + Request.Connection := 'keep-alive'; + sURL := Format('http://%s:%d/socket.io/1/', [Host, Port]); + strmResponse.Clear; + + //get initial handshake + Post(sURL, strmResponse, strmResponse); + if ResponseCode = 200 {OK} then + begin + //if not Connected then //reconnect + // Self.Connect; + strmResponse.Position := 0; + //The body of the response should contain the session id (sid) given to the client, + //followed by the heartbeat timeout, the connection closing timeout, and the list of supported transports separated by : + //4d4f185e96a7b:15:10:websocket,xhr-polling + with TStreamReader.Create(strmResponse) do + try + FSocketIOHandshakeResponse := ReadToEnd; + finally + Free; + end; + sKey := Copy(FSocketIOHandshakeResponse, 1, Pos(':', FSocketIOHandshakeResponse)-1); + sSocketioextended := 'socket.io/1/websocket/' + sKey; + WSResourceName := sSocketioextended; + end + else + begin + aFailedReason := Format('Initial socket.io handshake failed: "%d: %s"',[ResponseCode, ResponseText]); + if aRaiseException then + raise EIdWebSocketHandleError.Create(aFailedReason); + end; + end; + + Request.Clear; + strmResponse.Clear; + //http://www.websocket.org/aboutwebsocket.html + (* GET ws://echo.websocket.org/?encoding=text HTTP/1.1 + Origin: http://websocket.org + Cookie: __utma=99as + Connection: Upgrade + Host: echo.websocket.org + Sec-WebSocket-Key: uRovscZjNol/umbTt5uKmw== + Upgrade: websocket + Sec-WebSocket-Version: 13 *) + + //Connection: Upgrade + Request.Connection := 'Upgrade'; + //Upgrade: websocket + Request.CustomHeaders.Add('Upgrade: websocket'); + + //Sec-WebSocket-Key + sKey := ''; + for i := 1 to 16 do + sKey := sKey + Char(Random(127-32) + 32); + //base64 encoded + sKey := TIdEncoderMIME.EncodeString(sKey); + Request.CustomHeaders.AddValue('Sec-WebSocket-Key', sKey); + //Sec-WebSocket-Version: 13 + Request.CustomHeaders.AddValue('Sec-WebSocket-Version', '13'); + + Request.Host := Format('Host: %s:%d',[Host,Port]); + //ws://host:port/ + //about resourcename, see: http://dev.w3.org/html5/websockets/ "Parsing WebSocket URLs" + //sURL := Format('ws://%s:%d/%s', [Host, Port, WSResourceName]); + sURL := Format('http://%s:%d/%s', [Host, Port, WSResourceName]); + ReadTimeout := Max(2 * 1000, ReadTimeout); + + { voorbeeld: + GET http://localhost:9222/devtools/page/642D7227-148E-47C2-B97A-E00850E3AFA3 HTTP/1.1 + Upgrade: websocket + Connection: Upgrade + Host: localhost:9222 + Origin: http://localhost:9222 + Pragma: no-cache + Cache-Control: no-cache + Sec-WebSocket-Key: HIqoAdZkxnWWH9dnVPyW7w== + Sec-WebSocket-Version: 13 + Sec-WebSocket-Extensions: x-webkit-deflate-frame + User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/27.0.1453.116 Safari/537.36 + Cookie: __utma=1.2040118404.1366961318.1366961318.1366961318.1; __utmc=1; __utmz=1.1366961318.1.1.utmcsr=(direct)|utmccn=(direct)|utmcmd=(none); deviceorder=0123456789101112; MultiTouchEnabled=false; device=3; network_type=0 + } + if SocketIOCompatible then + begin + Response.Clear; + Response.ResponseCode := 0; + Request.URL := sURL; + Request.Method := Id_HTTPMethodGet; + Request.Source := nil; + Response.ContentStream := strmResponse; + PrepareRequest(Request); + + //connect and upgrade + ConnectToHost(Request, Response); + + //check upgrade succesfull + CheckForGracefulDisconnect(True); + CheckConnected; + Assert(Self.Connected); + if Response.ResponseCode <> 200{ok} then + begin + aFailedReason := Format('Error while upgrading: "%d: %s"',[ResponseCode, ResponseText]); + if aRaiseException then + raise EIdWebSocketHandleError.Create(aFailedReason) + else + Exit; + end; + + Response.Clear; + if IOHandler.CheckForDataOnSource(ReadTimeout) then + Response.ResponseText := IOHandler.InputBufferAsString(); + //for now: timer in mainthread? + TThread.Queue(nil, + procedure + begin + FHeartBeat.Interval := 5 * 1000; + FHeartBeat.Enabled := True; + end); + end + else + begin + Get(sURL, strmResponse, [101]); + + //http://www.websocket.org/aboutwebsocket.html + (* HTTP/1.1 101 WebSocket Protocol Handshake + Date: Fri, 10 Feb 2012 17:38:18 GMT + Connection: Upgrade + Server: Kaazing Gateway + Upgrade: WebSocket + Access-Control-Allow-Origin: http://websocket.org + Access-Control-Allow-Credentials: true + Sec-WebSocket-Accept: rLHCkw/SKsO9GAH/ZSFhBATDKrU= + Access-Control-Allow-Headers: content-type *) + + //'HTTP/1.1 101 Switching Protocols' + if ResponseCode <> 101 then + begin + aFailedReason := Format('Error while upgrading: "%d: %s"',[ResponseCode, ResponseText]); + if aRaiseException then + raise EIdWebSocketHandleError.Create(aFailedReason); + end; + //connection: upgrade + if not SameText(Response.Connection, 'upgrade') then + begin + aFailedReason := Format('Connection not upgraded: "%s"',[Response.Connection]); + if aRaiseException then + raise EIdWebSocketHandleError.Create(aFailedReason); + end; + //upgrade: websocket + if not SameText(Response.RawHeaders.Values['upgrade'], 'websocket') then + begin + aFailedReason := Format('Not upgraded to websocket: "%s"',[Response.RawHeaders.Values['upgrade']]); + if aRaiseException then + raise EIdWebSocketHandleError.Create(aFailedReason); + end; + //check handshake key + sResponseKey := Trim(sKey) + //... "minus any leading and trailing whitespace" + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'; //special GUID + sResponseKey := TIdEncoderMIME.EncodeBytes( //Base64 + FHash.HashString(sResponseKey) ); //SHA1 + if not SameText(Response.RawHeaders.Values['sec-websocket-accept'], sResponseKey) then + begin + aFailedReason := 'Invalid key handshake'; + if aRaiseException then + raise EIdWebSocketHandleError.Create(aFailedReason); + end; + end; + + //upgrade succesful + IOHandler.IsWebsocket := True; + aFailedReason := ''; + + //always read the data! (e.g. RO use override of AsyncDispatchEvent to process data) + //if Assigned(OnBinData) or Assigned(OnTextData) then + TIdWebsocketMultiReadThread.Instance.AddClient(Self); + + if SocketIOCompatible then + begin +// if FSocketIOContext = nil then +// begin + FSocketIOContext := TSocketIOContext.Create(Self); +// IInterface(FSocketIOContext)._AddRef; +// end +// else +// FSocketIOContext.Create(Self); //update with new iohandler etc + FSocketIO.WriteConnect(FSocketIOContext as TSocketIOContext); + end; + finally + Request.Clear; + strmResponse.Free; + end; +end; + +function TIdHTTPWebsocketClient.MakeImplicitClientHandler: TIdIOHandler; +begin + Result := TIdIOHandlerWebsocket.Create(nil); +end; + +procedure TIdHTTPWebsocketClient.ResetChannel; +//var +// ws: TIdIOHandlerWebsocket; +begin + TIdWebsocketMultiReadThread.Instance.RemoveClient(Self); + + if IOHandler <> nil then + begin + IOHandler.InputBuffer.Clear; + //close/disconnect internal socket + //ws := IndyClient.IOHandler as TIdIOHandlerWebsocket; + //ws.Close; done in disconnect below + end; + Disconnect(False); +end; + +procedure TIdHTTPWebsocketClient.SetIOHandlerWS( + const Value: TIdIOHandlerWebsocket); +begin + SetIOHandler(Value); +end; + +procedure TIdHTTPWebsocketClient.SetOnData(const Value: TDataBinEvent); +begin +// if not Assigned(Value) and not Assigned(FOnTextData) then +// TIdWebsocketMultiReadThread.Instance.RemoveClient(Self); + + FOnData := Value; + + if Assigned(Value) and + (Self.IOHandler as TIdIOHandlerWebsocket).IsWebsocket + then + TIdWebsocketMultiReadThread.Instance.AddClient(Self); +end; + +procedure TIdHTTPWebsocketClient.SetOnTextData(const Value: TDataStringEvent); +begin +// if not Assigned(Value) and not Assigned(FOnData) then +// TIdWebsocketMultiReadThread.Instance.RemoveClient(Self); + + FOnTextData := Value; + + if Assigned(Value) and + (Self.IOHandler as TIdIOHandlerWebsocket).IsWebsocket + then + TIdWebsocketMultiReadThread.Instance.AddClient(Self); +end; + +{ TIdHTTPSocketIOClient } + +procedure TIdHTTPSocketIOClient_old.AfterConstruction; +begin + inherited; + SocketIOCompatible := True; + + FHeartBeat := TTimer.Create(nil); + FHeartBeat.Enabled := False; + FHeartBeat.OnTimer := HeartBeatTimer; +end; + +procedure TIdHTTPSocketIOClient_old.AsyncDispatchEvent(const aEvent: string); +begin + //https://github.com/LearnBoost/socket.io-spec + if StartsStr('1:', aEvent) then //connect + Exit; + if aEvent = '2::' then //ping, heartbeat + Exit; + inherited AsyncDispatchEvent(aEvent); +end; + +procedure TIdHTTPSocketIOClient_old.AutoConnect; +begin + //for now: timer in mainthread? + TThread.Queue(nil, + procedure + begin + FHeartBeat.Interval := 5 * 1000; + FHeartBeat.Enabled := True; + end); +end; + +destructor TIdHTTPSocketIOClient_old.Destroy; +var tmr: TObject; +begin + tmr := FHeartBeat; + TThread.Queue(nil, //otherwise free in other thread than created + procedure + begin + //FHeartBeat.Free; + tmr.Free; + end); + inherited; +end; + +procedure TIdHTTPSocketIOClient_old.HeartBeatTimer(Sender: TObject); +begin + FHeartBeat.Enabled := False; + try + try + if (IOHandler <> nil) and + not IOHandler.ClosedGracefully and + IOHandler.Connected then + begin + IOHandler.Write('2:::'); //heartbeat socket.io message + end + //retry connect + else + try + //clear inputbuffer, otherwise it can't connect :( + if (IOHandler <> nil) and + not IOHandler.InputBufferIsEmpty + then + IOHandler.DiscardAll; + + Self.Connect; + TryUpgradeToWebsocket; + except + //skip, just retried + end; + except + //clear inputbuffer, otherwise it stays connected :( + if (IOHandler <> nil) and + not IOHandler.InputBufferIsEmpty + then + IOHandler.DiscardAll; + + if Assigned(OnDisConnected) then + OnDisConnected(Self); + try + raise EIdException.Create('Connection lost from ' + Format('ws://%s:%d/%s', [Host, Port, WSResourceName])); + except + //eat, no error popup! + end; + end; + finally + FHeartBeat.Enabled := True; //always enable: in case of disconnect it will re-connect + end; +end; + +procedure TIdHTTPSocketIOClient_old.InternalUpgradeToWebsocket( + aRaiseException: Boolean; out aFailedReason: string); +var + stimeout: string; +begin + inherited InternalUpgradeToWebsocket(aRaiseException, aFailedReason); + + if (aFailedReason = '') and + (IOHandler as TIdIOHandlerWebsocket).IsWebsocket then + begin + stimeout := Copy(SocketIOHandshakeResponse, Pos(':', SocketIOHandshakeResponse)+1, Length(SocketIOHandshakeResponse)); + stimeout := Copy(stimeout, 1, Pos(':', stimeout)-1); + if stimeout <> '' then + begin + //if (FHeartBeat.Interval > 0) then + //for now: timer in mainthread? + TThread.Queue(nil, + procedure + begin + FHeartBeat.Interval := StrToIntDef(stimeout, 15) * 1000; + if FHeartBeat.Interval >= 15000 then + //FHeartBeat.Interval := FHeartBeat.Interval - 5000 + FHeartBeat.Interval := 5000 + else if FHeartBeat.Interval >= 5000 then + FHeartBeat.Interval := FHeartBeat.Interval - 2000; + + FHeartBeat.Enabled := (FHeartBeat.Interval > 0); + end); + end; + + if Assigned(OnConnected) then + OnConnected(Self); + end; +end; + +(*) +procedure TIdHTTPSocketIOClient_old.ProcessSocketIORequest( + const strmRequest: TStream); + + function __ReadToEnd: string; + var + utf8: TBytes; + ilength: Integer; + begin + Result := ''; + ilength := strmRequest.Size - strmRequest.Position; + SetLength(utf8, ilength); + strmRequest.Read(utf8[0], ilength); + Result := TEncoding.UTF8.GetString(utf8); + end; + + function __GetSocketIOPart(const aData: string; aIndex: Integer): string; + var ipos: Integer; + i: Integer; + begin + //'5::/chat:{"name":"hi!"}' + //0 = 5 + //1 = + //2 = /chat + //3 = {"name":"hi!"} + ipos := 0; + for i := 0 to aIndex-1 do + ipos := PosEx(':', aData, ipos+1); + if ipos >= 0 then + begin + Result := Copy(aData, ipos+1, Length(aData)); + if aIndex < 3 then // /chat:{"name":"hi!"}' + begin + ipos := PosEx(':', Result, 1); // :{"name":"hi!"}' + if ipos > 0 then + Result := Copy(Result, 1, ipos-1); // /chat + end; + end; + end; + +var + str, smsg, schannel, sdata: string; + imsg: Integer; +// bCallback: Boolean; +begin + str := __ReadToEnd; + if str = '' then Exit; + + //5:1+:/chat:test + smsg := __GetSocketIOPart(str, 1); + imsg := 0; +// bCallback := False; + if smsg <> '' then // 1+ + begin + imsg := StrToIntDef(ReplaceStr(smsg,'+',''), 0); // 1 +// bCallback := (Pos('+', smsg) > 1); //trailing +, e.g. 1+ + end; + schannel := __GetSocketIOPart(str, 2); // /chat + sdata := __GetSocketIOPart(str, 3); // test + + //(0) Disconnect + if StartsStr('0:', str) then + begin + schannel := __GetSocketIOPart(str, 2); + if schannel <> '' then + //todo: close channel + else + Self.Disconnect; + end + //(1) Connect + //'1::' [path] [query] + else if StartsStr('1:', str) then + begin + //todo: add channel/room to authorized channel/room list + Self.IOHandler.Write(str); //write same connect back, e.g. 1::/chat + end + //(2) Heartbeat + else if StartsStr('2:', str) then + begin + Self.IOHandler.Write(str); //write same connect back, e.g. 2:: + end + //(3) Message (https://github.com/LearnBoost/socket.io-spec#3-message) + //'3:' [message id ('+')] ':' [message endpoint] ':' [data] + //3::/chat:hi + else if StartsStr('3:', str) then + begin + if Assigned(OnSocketIOMsg) then + OnSocketIOMsg(Self, sdata, imsg); + end + //(4) JSON Message + //'4:' [message id ('+')] ':' [message endpoint] ':' [json] + //4:1::{"a":"b"} + else if StartsStr('4:', str) then + begin + if Assigned(OnSocketIOJson) then + OnSocketIOJson(Self, sdata, imsg); + end + //(5) Event + //'5:' [message id ('+')] ':' [message endpoint] ':' [json encoded event] + //5::/chat:{"name":"my other event","args":[{"my":"data"}]} + //5:1+:/chat:{"name":"GetLocations","args":[""]} + else if StartsStr('5:', str) then + begin + if Assigned(OnSocketIOEvent) then + OnSocketIOEvent(Self, sdata, imsg); + end + //(6) ACK + //6::/news:1+["callback"] + //6:::1+["Response"] + //(7) Error + //(8) Noop + else if StartsStr('8:', str) then + begin + //nothing + end + else + raise Exception.CreateFmt('Unsupported data: "%s"', [str]); +end; +*) + +{ TIdWebsocketMultiReadThread } + +procedure TIdWebsocketMultiReadThread.AddClient( + aChannel: TIdHTTPWebsocketClient); +var l: TList; +begin + Assert( (aChannel.IOHandler as TIdIOHandlerWebsocket).IsWebsocket, 'Channel is not a websocket'); + + l := FChannels.LockList; + try + //already exists? + if l.IndexOf(aChannel) >= 0 then Exit; + + Assert(l.Count < 64, 'Max 64 connections can be handled by one read thread!'); //due to restrictions of the "select" API + l.Add(aChannel); + + //trigger the "select" wait + BreakSelectWait; + finally + FChannels.UnlockList; + end; +end; + +procedure TIdWebsocketMultiReadThread.AfterConstruction; +begin + inherited; + FChannels := TThreadList.Create; + FillChar(Freadset, SizeOf(Freadset), 0); + FillChar(Fexceptionset, SizeOf(Fexceptionset), 0); + + InitSpecialEventSocket; +end; + +procedure TIdWebsocketMultiReadThread.BreakSelectWait; +var + iResult: Integer; + LAddr: TSockAddrIn6; +begin + FillChar(LAddr, SizeOf(LAddr), 0); + //Id_IPv4 + with PSOCKADDR(@LAddr)^ do + begin + sin_family := Id_PF_INET4; + //dummy address and port + (GStack as TIdStackBSDBase).TranslateStringToTInAddr('0.0.0.0', sin_addr, Id_IPv4); + sin_port := htons(1); + end; + + FPendingBreak := True; + + //connect to non-existing address to stop "select" from waiting + //Note: this is some kind of "hack" because there is no nice way to stop it + //The only(?) other possibility is to make a "socket pair" and send a byte to it, + //but this requires a dynamic server socket (which can trigger a firewall + //exception/question popup in WindowsXP+) + iResult := IdWinsock2.connect(FTempHandle, PSOCKADDR(@LAddr), SIZE_TSOCKADDRIN); + //non blocking socket, so will always result in "would block"! + if (iResult <> Id_SOCKET_ERROR) or + ( (GStack <> nil) and (GStack.WSGetLastError <> WSAEWOULDBLOCK) ) + then + GStack.CheckForSocketError(iResult); +end; + +destructor TIdWebsocketMultiReadThread.Destroy; +begin + IdWinsock2.closesocket(FTempHandle); + FChannels.Free; + inherited; +end; + +procedure TIdWebsocketMultiReadThread.Execute; +begin + Self.NameThreadForDebugging(AnsiString(Self.ClassName)); + + while not Terminated do + begin + try + while not Terminated do + ReadFromAllChannels; + except + //continue + end; + end; +end; + +procedure TIdWebsocketMultiReadThread.InitSpecialEventSocket; +var + param: Cardinal; + iResult: Integer; +begin + if GStack = nil then Exit; //finalized? + + //alloc socket + FTempHandle := GStack.NewSocketHandle(Id_SOCK_STREAM, Id_IPPROTO_IP, Id_IPv4, False); + Assert(FTempHandle <> Id_INVALID_SOCKET); + //non block mode + param := 1; // enable NON blocking mode + iResult := ioctlsocket(FTempHandle, FIONBIO, param); + GStack.CheckForSocketError(iResult); +end; + +class function TIdWebsocketMultiReadThread.Instance: TIdWebsocketMultiReadThread; +begin + if (FInstance = nil) and + not TFinalizationHelper.ApplicationIsTerminating then + begin + FInstance := TIdWebsocketMultiReadThread.Create(True); + FInstance.Start; + end; + Result := FInstance; +end; + +procedure TIdWebsocketMultiReadThread.ReadFromAllChannels; +var + l: TList; + chn: TIdHTTPWebsocketClient; + iCount, + i: Integer; + iResult: NativeInt; + strmEvent: TMemoryStream; + swstext: utf8string; + ws: TIdIOHandlerWebsocket; + wscode: TWSDataCode; +begin + l := FChannels.LockList; + try + iCount := 0; + Freadset.fd_count := iCount; + + for i := 0 to l.Count - 1 do + begin + chn := TIdHTTPWebsocketClient(l.Items[i]); + //valid? + if //not chn.Busy and also take busy channels (will be ignored later), otherwise we have to break/reset for each RO function execution + (chn.Socket.Binding.Handle > 0) and + (chn.Socket.Binding.Handle <> INVALID_SOCKET) then + begin + Freadset.fd_count := iCount+1; + Freadset.fd_array[iCount] := chn.Socket.Binding.Handle; + Inc(iCount); + end; + end; + + if FPendingBreak then + ResetSpecialEventSocket; + finally + FChannels.UnlockList; + end; + + //special helper socket to be able to stop "select" from waiting + Fexceptionset.fd_count := 1; + Fexceptionset.fd_array[0] := FTempHandle; + + //wait 15s till some data + Finterval.tv_sec := 15; //15s + Finterval.tv_usec := 0; + + //nothing to wait for? then sleep some time to prevent 100% CPU + if iCount = 0 then + begin + iResult := IdWinsock2.select(0, nil, nil, @Fexceptionset, @Finterval); + if iResult = SOCKET_ERROR then + iResult := 1; //ignore errors + end + //wait till a socket has some data (or a signal via exceptionset is fired) + else + iResult := IdWinsock2.select(0, @Freadset, nil, @Fexceptionset, @Finterval); + + if iResult = SOCKET_ERROR then + //raise EIdWinsockStubError.Build(WSAGetLastError, '', []); + //ignore error during wait: socket disconnected etc + Exit; + + if Terminated then Exit; + + //some data? + if (iResult > 0) then + begin + strmEvent := nil; + + l := FChannels.LockList; + try + //check for data for all channels + for i := 0 to l.Count - 1 do + begin + chn := TIdHTTPWebsocketClient(l.Items[i]); + try + //try to process all events + while chn.IOHandler.Readable(0) do //has some data + begin + ws := chn.IOHandler as TIdIOHandlerWebsocket; + //no pending dispatch active? (so actually we only read events here?) + if ws.TryLock then + begin + try + if strmEvent = nil then + strmEvent := TMemoryStream.Create; + strmEvent.Clear; + + //first is the data type TWSDataType(text or bin), but is ignore/not needed + wscode := TWSDataCode(chn.IOHandler.ReadLongWord); + //next the size + data = stream + chn.IOHandler.ReadStream(strmEvent); + + //ignore ping/pong messages + if wscode in [wdcPing, wdcPong] then Continue; + finally + ws.Unlock; + end; + + //fire event + //offload event dispatching to different thread! otherwise deadlocks possible? (do to synchronize) + strmEvent.Position := 0; + if wscode = wdcBinary then + begin + chn.AsyncDispatchEvent(strmEvent); + end + else if wscode = wdcText then + begin + SetLength(swstext, strmEvent.Size); + strmEvent.Read(swstext[1], strmEvent.Size); + if swstext <> '' then + begin + chn.AsyncDispatchEvent(string(swstext)); + end; + end; + end; + end; + except + l := nil; + FChannels.UnlockList; + chn.ResetChannel; + raise; + end; + end; + + if FPendingBreak then + ResetSpecialEventSocket; + finally + if l <> nil then + FChannels.UnlockList; + strmEvent.Free; + end; + end; +end; + +procedure TIdWebsocketMultiReadThread.RemoveClient( + aChannel: TIdHTTPWebsocketClient); +begin + if Self = nil then Exit; + FChannels.Remove(aChannel); + BreakSelectWait; +end; + +class procedure TIdWebsocketMultiReadThread.RemoveInstance; +begin + if FInstance <> nil then + FreeAndNil(FInstance); +end; + +procedure TIdWebsocketMultiReadThread.ResetSpecialEventSocket; +begin + Assert(FPendingBreak); + FPendingBreak := False; + + IdWinsock2.closesocket(FTempHandle); + InitSpecialEventSocket; +end; + +procedure TIdWebsocketMultiReadThread.Terminate; +begin + inherited Terminate; + + FChannels.LockList; + try + //fire a signal, so the "select" wait will quit and thread can stop + BreakSelectWait; + finally + FChannels.UnlockList; + end; +end; + +{ TIdWebsocketDispatchThread } + +procedure TIdWebsocketDispatchThread.AfterConstruction; +begin + inherited; + FEvents := TList.Create; + FProcessing := TList.Create; + FEvent := TEvent.Create; +end; + +destructor TIdWebsocketDispatchThread.Destroy; +begin + System.TMonitor.Enter(FEvents); + FEvents.Clear; + FEvents.Free; + FProcessing.Free; + + FEvent.Free; + inherited; +end; + +procedure TIdWebsocketDispatchThread.Execute; +var + proc: Classes.TThreadProcedure; +begin + while not Terminated do + begin + try + if FEvent.WaitFor(3 * 1000) = wrSignaled then + begin + FEvent.ResetEvent; + System.TMonitor.Enter(FEvents); + try + //copy + while FEvents.Count > 0 do + begin + proc := FEvents.Items[0]; + FProcessing.Add(proc); + FEvents.Delete(0); + end; + finally + System.TMonitor.Exit(FEvents); + end; + end; + + while FProcessing.Count > 0 do + begin + proc := FProcessing.Items[0]; + FProcessing.Delete(0); + proc(); + end; + except + //continue + end; + end; +end; + +class function TIdWebsocketDispatchThread.Instance: TIdWebsocketDispatchThread; +begin + if FInstance = nil then + begin + FInstance := TIdWebsocketDispatchThread.Create(True); + FInstance.Start; + end; + Result := FInstance; +end; + +procedure TIdWebsocketDispatchThread.QueueEvent(aEvent: TThreadProcedure); +begin + System.TMonitor.Enter(FEvents); + try + FEvents.Add(aEvent); + finally + System.TMonitor.Exit(FEvents); + end; + FEvent.SetEvent; +end; + +procedure TIdWebsocketDispatchThread.Terminate; +begin + inherited Terminate; + FEvent.SetEvent; +end; + +initialization +finalization + if TIdWebsocketMultiReadThread.FInstance <> nil then + begin + TIdWebsocketMultiReadThread.Instance.Terminate; + TBaseNamedThread.WaitForThread(TIdWebsocketMultiReadThread.Instance, 5 * 1000); + TIdWebsocketMultiReadThread.RemoveInstance; + end; + +end. diff --git a/IdIOHandlerWebsocket.pas b/IdIOHandlerWebsocket.pas new file mode 100644 index 0000000..300a24c --- /dev/null +++ b/IdIOHandlerWebsocket.pas @@ -0,0 +1,771 @@ +unit IdIOHandlerWebsocket; + +//The WebSocket Protocol, RFC 6455 +//http://datatracker.ietf.org/doc/rfc6455/?include_text=1 + +interface + +uses + Classes, + IdIOHandlerStack, IdGlobal, IdException, IdBuffer, SyncObjs, + Generics.Collections; + +type + TWSDataType = (wdtText, wdtBinary); + TWSDataCode = (wdcNone, wdcContinuation, wdcText, wdcBinary, wdcClose, wdcPing, wdcPong); + TWSExtensionBit = (webBit1, webBit2, webBit3); + TWSExtensionBits = set of TWSExtensionBit; + + TIdIOHandlerWebsocket = class; + EIdWebSocketHandleError = class(EIdSocketHandleError); + + TIdIOHandlerWebsocket = class(TIdIOHandlerStack) + private + FIsServerSide: Boolean; + FBusyUpgrading: Boolean; + FIsWebsocket: Boolean; + FWSInputBuffer: TIdBuffer; + FExtensionBits: TWSExtensionBits; + FLock: TCriticalSection; + FCloseReason: string; + FCloseCode: Integer; + FClosing: Boolean; + protected + FMessageStream: TMemoryStream; + FWriteTextToTarget: Boolean; + FCloseCodeSend: Boolean; + + function InternalReadDataFromSource(var VBuffer: TIdBytes): Integer; + function ReadDataFromSource(var VBuffer: TIdBytes): Integer; override; + function WriteDataToTarget (const ABuffer: TIdBytes; const AOffset, ALength: Integer): Integer; override; + + function ReadFrame(out aFIN, aRSV1, aRSV2, aRSV3: boolean; out aDataCode: TWSDataCode; out aData: TIdBytes): Integer; + function ReadMessage(var aBuffer: TIdBytes; out aDataCode: TWSDataCode): Integer; + public + function WriteData(aData: TIdBytes; aType: TWSDataCode; + aFIN: boolean = true; aRSV1: boolean = false; aRSV2: boolean = false; aRSV3: boolean = false): integer; + property BusyUpgrading : Boolean read FBusyUpgrading write FBusyUpgrading; + property IsWebsocket : Boolean read FIsWebsocket write FIsWebsocket; + property IsServerSide : Boolean read FIsServerSide write FIsServerSide; + property ClientExtensionBits : TWSExtensionBits read FExtensionBits write FExtensionBits; + public + procedure AfterConstruction;override; + destructor Destroy; override; + + procedure Lock; + procedure Unlock; + function TryLock: Boolean; + + procedure Close; override; + property Closing : Boolean read FClosing; + property CloseCode : Integer read FCloseCode write FCloseCode; + property CloseReason: string read FCloseReason write FCloseReason; + + //text/string writes + procedure Write(const AOut: string; AEncoding: TIdTextEncoding = nil); overload; override; + procedure WriteLn(const AOut: string; AEncoding: TIdTextEncoding = nil); overload; override; + procedure WriteLnRFC(const AOut: string = ''; AEncoding: TIdTextEncoding = nil); override; + procedure Write(AValue: TStrings; AWriteLinesCount: Boolean = False; AEncoding: TIdTextEncoding = nil); overload; override; + procedure Write(AStream: TStream; aType: TWSDataType); overload; + end; + +//close frame codes +const + C_FrameClose_Normal = 1000; //1000 indicates a normal closure, meaning that the purpose for + //which the connection was established has been fulfilled. + C_FrameClose_GoingAway = 1001; //1001 indicates that an endpoint is "going away", such as a server + //going down or a browser having navigated away from a page. + C_FrameClose_ProtocolError = 1002; //1002 indicates that an endpoint is terminating the connection due + //to a protocol error. + C_FrameClose_UnhandledDataType = 1003; //1003 indicates that an endpoint is terminating the connection + //because it has received a type of data it cannot accept (e.g., an + //endpoint that understands only text data MAY send this if it + //receives a binary message). + C_FrameClose_Reserved = 1004; //Reserved. The specific meaning might be defined in the future. + C_FrameClose_ReservedNoStatus = 1005; //1005 is a reserved value and MUST NOT be set as a status code in a + //Close control frame by an endpoint. It is designated for use in + //applications expecting a status code to indicate that no status + //code was actually present. + C_FrameClose_ReservedAbnormal = 1006; //1006 is a reserved value and MUST NOT be set as a status code in a + //Close control frame by an endpoint. It is designated for use in + //applications expecting a status code to indicate that the + //connection was closed abnormally, e.g., without sending or + //receiving a Close control frame. + C_FrameClose_InconsistentData = 1007; //1007 indicates that an endpoint is terminating the connection + //because it has received data within a message that was not + //consistent with the type of the message (e.g., non-UTF-8 [RFC3629] + //data within a text message). + C_FrameClose_PolicyError = 1008; //1008 indicates that an endpoint is terminating the connection + //because it has received a message that violates its policy. This + //is a generic status code that can be returned when there is no + //other more suitable status code (e.g., 1003 or 1009) or if there + //is a need to hide specific details about the policy. + C_FrameClose_ToBigMessage = 1009; //1009 indicates that an endpoint is terminating the connection + //because it has received a message that is too big for it to process. + C_FrameClose_MissingExtenstion = 1010; //1010 indicates that an endpoint (client) is terminating the + //connection because it has expected the server to negotiate one or + //more extension, but the server didn't return them in the response + //message of the WebSocket handshake. The list of extensions that + //are needed SHOULD appear in the /reason/ part of the Close frame. + //Note that this status code is not used by the server, because it + //can fail the WebSocket handshake instead. + C_FrameClose_UnExpectedError = 1011; //1011 indicates that a server is terminating the connection because + //it encountered an unexpected condition that prevented it from + //fulfilling the request. + C_FrameClose_ReservedTLSError = 1015; //1015 is a reserved value and MUST NOT be set as a status code in a + //Close control frame by an endpoint. It is designated for use in + //applications expecting a status code to indicate that the + //connection was closed due to a failure to perform a TLS handshake + //(e.g., the server certificate can't be verified). + +implementation + +uses + SysUtils, Math, IdStream, IdStack, IdWinsock2, IdExceptionCore, + IdResourceStrings, IdResourceStringsCore; + +//frame codes +const + C_FrameCode_Continuation = 0; + C_FrameCode_Text = 1; + C_FrameCode_Binary = 2; + //3-7 are reserved for further non-control frames + C_FrameCode_Close = 8; + C_FrameCode_Ping = 9; + C_FrameCode_Pong = 10 {A}; + //B-F are reserved for further control frames + +{ TIdIOHandlerStack_Websocket } + +procedure TIdIOHandlerWebsocket.AfterConstruction; +begin + inherited; + FMessageStream := TMemoryStream.Create; + FWSInputBuffer := TIdBuffer.Create; + FLock := TCriticalSection.Create; +end; + +procedure TIdIOHandlerWebsocket.Close; +var + iaWriteBuffer: TIdBytes; + sReason: UTF8String; + iOptVal, iOptLen: Integer; + bConnected: Boolean; +begin + try + //valid connection? + bConnected := Opened and + SourceIsAvailable and + not ClosedGracefully; + + //no socket error? connection closed by software abort, connection reset by peer, etc + iOptLen := SIZE_INTEGER; + bConnected := bConnected and + (IdWinsock2.getsockopt(Self.Binding.Handle, SOL_SOCKET, SO_ERROR, PAnsiChar(@iOptVal), iOptLen) = 0) and + (iOptVal = 0); + + if bConnected and IsWebsocket then + begin + //close message must be responded with a close message back + //or initiated with a close message + if not FCloseCodeSend then + begin + FCloseCodeSend := True; + + //we initiate the close? then write reason etc + if not Closing then + begin + SetLength(iaWriteBuffer, 2); + if CloseCode < C_FrameClose_Normal then + CloseCode := C_FrameClose_Normal; + iaWriteBuffer[0] := Byte(CloseCode shr 8); + iaWriteBuffer[1] := Byte(CloseCode); + if CloseReason <> '' then + begin + sReason := utf8string(CloseReason); + SetLength(iaWriteBuffer, Length(iaWriteBuffer) + Length(sReason)); + Move(sReason[1], iaWriteBuffer[2], Length(sReason)); + end; + end + else + begin + //just send normal close response back + SetLength(iaWriteBuffer, 2); + iaWriteBuffer[0] := Byte(C_FrameClose_Normal shr 8); + iaWriteBuffer[1] := Byte(C_FrameClose_Normal); + end; + + WriteData(iaWriteBuffer, wdcClose); //send close + code back + end; + + //we did initiate the close? then wait (a little) for close response + if not Closing then + begin + FClosing := True; + CheckForDisconnect(); + //wait till client respond with close message back + //but a pending message can be in the buffer, so process this too + while ReadFromSource(False{no disconnect error}, 1 * 1000, False) > 0 do ; //response within 1s? + end; + end; + except + //ignore, it's possible that the client is disconnected already (crashed etc) + end; + + IsWebsocket := False; + BusyUpgrading := False; + inherited Close; +end; + +destructor TIdIOHandlerWebsocket.Destroy; +begin + FLock.Enter; + FLock.Free; + + FWSInputBuffer.Free; + FMessageStream.Free; + inherited; +end; + +function TIdIOHandlerWebsocket.InternalReadDataFromSource( + var VBuffer: TIdBytes): Integer; +begin + CheckForDisconnect; + if not Readable(ReadTimeout) or + not Opened or + not SourceIsAvailable then + begin + CheckForDisconnect; //disconnected during wait in "Readable()"? + if not Opened then + EIdNotConnected.Toss(RSNotConnected) + else if not SourceIsAvailable then + EIdClosedSocket.Toss(RSStatusDisconnected); + GStack.CheckForSocketError(GStack.WSGetLastError); //check for socket error + EIdReadTimeout.Toss(RSIdNoDataToRead); //exit, no data can be received + end; + + SetLength(VBuffer, RecvBufferSize); + Result := inherited ReadDataFromSource(VBuffer); + if Result = 0 then + begin + CheckForDisconnect; //disconnected in the mean time? + GStack.CheckForSocketError(GStack.WSGetLastError); //check for socket error + EIdNoDataToRead.Toss(RSIdNoDataToRead); //nothing read? then connection is probably closed -> exit + end; + SetLength(VBuffer, Result); +end; + +procedure TIdIOHandlerWebsocket.WriteLn(const AOut: string; + AEncoding: TIdTextEncoding); +begin + FWriteTextToTarget := True; + try + inherited WriteLn(AOut, TIdTextEncoding.UTF8); //must be UTF8! + finally + FWriteTextToTarget := False; + end; +end; + +procedure TIdIOHandlerWebsocket.WriteLnRFC(const AOut: string; + AEncoding: TIdTextEncoding); +begin + FWriteTextToTarget := True; + try + inherited WriteLnRFC(AOut, TIdTextEncoding.UTF8); //must be UTF8! + finally + FWriteTextToTarget := False; + end; +end; + +procedure TIdIOHandlerWebsocket.Write(const AOut: string; + AEncoding: TIdTextEncoding); +begin + FWriteTextToTarget := True; + try + inherited Write(AOut, TIdTextEncoding.UTF8); //must be UTF8! + finally + FWriteTextToTarget := False; + end; +end; + +procedure TIdIOHandlerWebsocket.Write(AValue: TStrings; + AWriteLinesCount: Boolean; AEncoding: TIdTextEncoding); +begin + FWriteTextToTarget := True; + try + inherited Write(AValue, AWriteLinesCount, TIdTextEncoding.UTF8); //must be UTF8! + finally + FWriteTextToTarget := False; + end; +end; + +procedure TIdIOHandlerWebsocket.Write(AStream: TStream; + aType: TWSDataType); +begin + FWriteTextToTarget := (aType = wdtText); + try + inherited Write(AStream); + finally + FWriteTextToTarget := False; + end; +end; + +function TIdIOHandlerWebsocket.WriteDataToTarget(const ABuffer: TIdBytes; + const AOffset, ALength: Integer): Integer; +begin + if not IsWebsocket then + Result := inherited WriteDataToTarget(ABuffer, AOffset, ALength) + else + begin + Lock; + try + if FWriteTextToTarget then + Result := WriteData(ABuffer, wdcText, True{send all at once}, + webBit1 in ClientExtensionBits, webBit2 in ClientExtensionBits, webBit3 in ClientExtensionBits) + else + Result := WriteData(ABuffer, wdcBinary, True{send all at once}, + webBit1 in ClientExtensionBits, webBit2 in ClientExtensionBits, webBit3 in ClientExtensionBits); + except + Unlock; //always unlock when socket exception + FClosedGracefully := True; + Raise; + end; + Unlock; //normal unlock (no double try finally) + end; +end; + +function TIdIOHandlerWebsocket.ReadDataFromSource( + var VBuffer: TIdBytes): Integer; +var + wscode: TWSDataCode; +begin + //the first time something is read AFTER upgrading, we switch to WS + //(so partial writes can be done, till a read is done) + if BusyUpgrading then + begin + BusyUpgrading := False; + IsWebsocket := True; + end; + + if not IsWebsocket then + Result := inherited ReadDataFromSource(VBuffer) + else + begin + Lock; + try + //we wait till we have a full message here (can be fragmented in several frames) + Result := ReadMessage(VBuffer, wscode); + + //first write the data code (text or binary, ping, pong) + FInputBuffer.Write(LongWord(Ord(wscode))); + //we write message size here, vbuffer is written after this. This way we can use ReadStream to get 1 single message (in case multiple messages in FInputBuffer) + if LargeStream then + FInputBuffer.Write(Int64(Result)) + else + FInputBuffer.Write(LongWord(Result)) + except + Unlock; //always unlock when socket exception + FClosedGracefully := True; //closed (but not gracefully?) + Raise; + end; + Unlock; //normal unlock (no double try finally) + end; +end; + +function TIdIOHandlerWebsocket.ReadMessage(var aBuffer: TIdBytes; out aDataCode: TWSDataCode): Integer; +var + iReadCount: Integer; + iaReadBuffer: TIdBytes; + bFIN, bRSV1, bRSV2, bRSV3: boolean; + lDataCode: TWSDataCode; + lFirstDataCode: TWSDataCode; +// closeCode: integer; +// closeResult: string; +begin + Result := 0; + (* ...all fragments of a message are of + the same type, as set by the first fragment's opcode. Since + control frames cannot be fragmented, the type for all fragments in + a message MUST be either text, binary, or one of the reserved + opcodes. *) + lFirstDataCode := wdcNone; + FMessageStream.Clear; + repeat + //read a single frame + iReadCount := ReadFrame(bFIN, bRSV1, bRSV2, bRSV3, lDataCode, iaReadBuffer); + if (iReadCount > 0) or + (lDataCode <> wdcNone) then + begin + Assert(Length(iaReadBuffer) = iReadCount); + + //store client extension bits + if Self.IsServerSide then + begin + ClientExtensionBits := []; + if bRSV1 then ClientExtensionBits := ClientExtensionBits + [webBit1]; + if bRSV2 then ClientExtensionBits := ClientExtensionBits + [webBit2]; + if bRSV3 then ClientExtensionBits := ClientExtensionBits + [webBit3]; + end; + + //process frame + case lDataCode of + wdcText, wdcBinary: + begin + if lFirstDataCode <> wdcNone then + raise EIdWebSocketHandleError.Create('Invalid frame: specified data code only allowed for the first frame'); + lFirstDataCode := lDataCode; + + FMessageStream.Clear; + TIdStreamHelper.Write(FMessageStream, iaReadBuffer); + end; + wdcContinuation: + begin + if not (lFirstDataCode in [wdcText, wdcBinary]) then + raise EIdWebSocketHandleError.Create('Invalid frame continuation'); + TIdStreamHelper.Write(FMessageStream, iaReadBuffer); + end; + wdcClose: + begin + FCloseCode := C_FrameClose_Normal; + //"If there is a body, the first two bytes of the body MUST be a 2-byte + // unsigned integer (in network byte order) representing a status code" + if Length(iaReadBuffer) > 1 then + begin + FCloseCode := (iaReadBuffer[0] shl 8) + + iaReadBuffer[1]; + if Length(iaReadBuffer) > 2 then + FCloseReason := BytesToString(iaReadBuffer, 2, Length(iaReadBuffer), TEncoding.UTF8); + end; + + FClosing := True; + Self.Close; + end; + //Note: control frames can be send between fragmented frames + wdcPing: + begin + WriteData(iaReadBuffer, wdcPong); //send pong + same data back + lFirstDataCode := lDataCode; + //bFIN := False; //ignore ping when we wait for data? + end; + wdcPong: + begin + //pong received, ignore; + lFirstDataCode := lDataCode; + end; + end; + end + else + Break; + until bFIN; + + //done? + if bFIN then + begin + if (lFirstDataCode in [wdcText, wdcBinary]) then + begin + //result + FMessageStream.Position := 0; + TIdStreamHelper.ReadBytes(FMessageStream, aBuffer); + Result := FMessageStream.Size; + aDataCode := lFirstDataCode + end + else if (lFirstDataCode in [wdcPing, wdcPong]) then + begin + //result + FMessageStream.Position := 0; + TIdStreamHelper.ReadBytes(FMessageStream, aBuffer); + SetLength(aBuffer, FMessageStream.Size); + //dummy data: there *must* be some data read otherwise connection is closed by Indy! + if Length(aBuffer) <= 0 then + begin + SetLength(aBuffer, 1); + aBuffer[0] := Ord(lFirstDataCode); + end; + + Result := Length(aBuffer); + aDataCode := lFirstDataCode + end; + end; +end; + +procedure TIdIOHandlerWebsocket.Lock; +begin + FLock.Enter; +end; + +function TIdIOHandlerWebsocket.TryLock: Boolean; +begin + Result := FLock.TryEnter; +end; + +procedure TIdIOHandlerWebsocket.Unlock; +begin + FLock.Leave; +end; + +function TIdIOHandlerWebsocket.ReadFrame(out aFIN, aRSV1, aRSV2, aRSV3: boolean; + out aDataCode: TWSDataCode; out aData: TIdBytes): Integer; +var + iInputPos: NativeInt; + + function _GetByte: Byte; + var + temp: TIdBytes; + begin + while FWSInputBuffer.Size <= iInputPos do + begin + //FWSInputBuffer.AsString; + InternalReadDataFromSource(temp); + FWSInputBuffer.Write(temp); + end; + + //Self.ReadByte copies all data everytime (because the first byte must be removed) so we use index (much more efficient) + Result := FWSInputBuffer.PeekByte(iInputPos); + //FWSInputBuffer.AsString + inc(iInputPos); + end; + + function _GetBytes(aCount: Integer): TIdBytes; + var + temp: TIdBytes; + begin + while FWSInputBuffer.Size < aCount do + begin + InternalReadDataFromSource(temp); + FWSInputBuffer.Write(temp); + end; + + FWSInputBuffer.ExtractToBytes(Result, aCount); + end; + +var + iByte: Byte; + i, iCode: NativeInt; + bHasMask: boolean; + iDataLength, iPos: Int64; + rMask: record + case Boolean of + True : (MaskAsBytes: array[0..3] of Byte); + False: (MaskAsInt : Int32); + end; +begin + iInputPos := 0; + SetLength(aData, 0); + aDataCode := wdcNone; + + //wait + process data + iByte := _GetByte; + (* 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 (nr) + 7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0 (bit) + +-+-+-+-+-------+-+-------------+-------------------------------+ + |F|R|R|R| opcode|M| Payload len | Extended payload length | + |I|S|S|S| (4) |A| (7) | (16/64) | + |N|V|V|V| |S| | (if payload len==126/127) | + | |1|2|3| |K| | | + +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - + *) + //FIN, RSV1, RSV2, RSV3: 1 bit each + aFIN := (iByte and (1 shl 7)) > 0; + aRSV1 := (iByte and (1 shl 6)) > 0; + aRSV2 := (iByte and (1 shl 5)) > 0; + aRSV3 := (iByte and (1 shl 4)) > 0; + //Opcode: 4 bits + iCode := (iByte and $0F); //clear 4 MSB's + case iCode of + C_FrameCode_Continuation: aDataCode := wdcContinuation; + C_FrameCode_Text: aDataCode := wdcText; + C_FrameCode_Binary: aDataCode := wdcBinary; + C_FrameCode_Close: aDataCode := wdcClose; + C_FrameCode_Ping: aDataCode := wdcPing; + C_FrameCode_Pong: aDataCode := wdcPong; + else + raise EIdException.CreateFmt('Unsupported data code: %d', [iCode]); + end; + + //Mask: 1 bit + iByte := _GetByte; + bHasMask := (iByte and (1 shl 7)) > 0; + //Length (7 bits or 7+16 bits or 7+64 bits) + iDataLength := (iByte and $7F); //clear 1 MSB + //Extended payload length? + //If 126, the following 2 bytes interpreted as a 16-bit unsigned integer are the payload length + if (iDataLength = 126) then + begin + iByte := _GetByte; + iDataLength := (iByte shl 8); //8 MSB + iByte := _GetByte; + iDataLength := iDataLength + iByte; + end + //If 127, the following 8 bytes interpreted as a 64-bit unsigned integer (the most significant bit MUST be 0) are the payload length + else if (iDataLength = 127) then + begin + iDataLength := 0; + for i := 7 downto 0 do //read 8 bytes in reverse order + begin + iByte := _GetByte; + iDataLength := iDataLength + + (Int64(iByte) shl (8 * i)); //shift bits to left to recreate 64bit integer + end; + Assert(iDataLength > 0); + end; + + //"All frames sent from client to server must have this bit set to 1" + if IsServerSide and not bHasMask then + raise EIdWebSocketHandleError.Create('No mask supplied: mask is required for clients when sending data to server') + else if not IsServerSide and bHasMask then + raise EIdWebSocketHandleError.Create('Mask supplied but mask is not allowed for servers when sending data to clients'); + + //Masking-key: 0 or 4 bytes + if bHasMask then + begin + rMask.MaskAsBytes[0] := _GetByte; + rMask.MaskAsBytes[1] := _GetByte; + rMask.MaskAsBytes[2] := _GetByte; + rMask.MaskAsBytes[3] := _GetByte; + end; + //Payload data: (x+y) bytes + FWSInputBuffer.Remove(iInputPos); //remove first couple of processed bytes (header) + //simple read? + if not bHasMask then + aData := _GetBytes(iDataLength) + else + //reverse mask + begin + aData := _GetBytes(iDataLength); + iPos := 0; + while iPos < iDataLength do + begin + aData[iPos] := aData[iPos] xor + rMask.MaskAsBytes[iPos mod 4]; //apply mask + inc(iPos); + end; + end; + + Result := Length(aData); +end; + +function TIdIOHandlerWebsocket.WriteData(aData: TIdBytes; + aType: TWSDataCode; aFIN, aRSV1, aRSV2, aRSV3: boolean): integer; +var + iByte: Byte; + i: NativeInt; + iDataLength, iPos: Int64; + rLength: Int64Rec; + rMask: record + case Boolean of + True : (MaskAsBytes: array[0..3] of Byte); + False: (MaskAsInt : Int32); + end; + strmData: TMemoryStream; + bData: TBytes; +begin + Result := 0; + Assert(Binding <> nil); + + strmData := TMemoryStream.Create; + try + (* 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 (nr) + 7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0 (bit) + +-+-+-+-+-------+-+-------------+-------------------------------+ + |F|R|R|R| opcode|M| Payload len | Extended payload length | + |I|S|S|S| (4) |A| (7) | (16/64) | + |N|V|V|V| |S| | (if payload len==126/127) | + | |1|2|3| |K| | | + +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - + *) + //FIN, RSV1, RSV2, RSV3: 1 bit each + if aFIN then iByte := (1 shl 7); + if aRSV1 then iByte := iByte + (1 shl 6); + if aRSV2 then iByte := iByte + (1 shl 5); + if aRSV3 then iByte := iByte + (1 shl 4); + //Opcode: 4 bits + case aType of + wdcContinuation : iByte := iByte + C_FrameCode_Continuation; + wdcText : iByte := iByte + C_FrameCode_Text; + wdcBinary : iByte := iByte + C_FrameCode_Binary; + wdcClose : iByte := iByte + C_FrameCode_Close; + wdcPing : iByte := iByte + C_FrameCode_Ping; + wdcPong : iByte := iByte + C_FrameCode_Pong; + else + raise EIdException.CreateFmt('Unsupported data code: %d', [Ord(aType)]); + end; + strmData.Write(iByte, SizeOf(iByte)); + + iByte := 0; + //Mask: 1 bit; Note: Clients must apply a mask + if not IsServerSide then iByte := (1 shl 7); + + //Length: 7 bits or 7+16 bits or 7+64 bits + if Length(aData) < 126 then //7 bit, 128 + iByte := iByte + Length(aData) + else if Length(aData) < 1 shl 16 then //16 bit, 65536 + iByte := iByte + 126 + else + iByte := iByte + 127; + strmData.Write(iByte, SizeOf(iByte)); + + //Extended payload length? + if Length(aData) >= 126 then + begin + //If 126, the following 2 bytes interpreted as a 16-bit unsigned integer are the payload length + if Length(aData) < 1 shl 16 then //16 bit, 65536 + begin + rLength.Lo := Length(aData); + iByte := rLength.Bytes[1]; + strmData.Write(iByte, SizeOf(iByte)); + iByte := rLength.Bytes[0]; + strmData.Write(iByte, SizeOf(iByte)); + end + else + //If 127, the following 8 bytes interpreted as a 64-bit unsigned integer (the most significant bit MUST be 0) are the payload length + begin + rLength := Int64Rec(Int64(Length(aData))); + for i := 7 downto 0 do + begin + iByte := rLength.Bytes[i]; + strmData.Write(iByte, SizeOf(iByte)); + end; + end + end; + + //Masking-key: 0 or 4 bytes; Note: Clients must apply a mask + if not IsServerSide then + begin + rMask.MaskAsInt := Random(MaxInt); + strmData.Write(rMask.MaskAsBytes[0], SizeOf(Byte)); + strmData.Write(rMask.MaskAsBytes[1], SizeOf(Byte)); + strmData.Write(rMask.MaskAsBytes[2], SizeOf(Byte)); + strmData.Write(rMask.MaskAsBytes[3], SizeOf(Byte)); + end; + + //write header + strmData.Position := 0; + TIdStreamHelper.ReadBytes(strmData, bData); + Result := Binding.Send(bData); + + //Mask? Note: Only clients must apply a mask + if IsServerSide then + begin + Result := Binding.Send(aData); + end + else + begin + iPos := 0; + iDataLength := Length(aData); + //in place masking + while iPos < iDataLength do + begin + iByte := aData[iPos] xor rMask.MaskAsBytes[iPos mod 4]; //apply mask + aData[iPos] := iByte; + inc(iPos); + end; + + //send masked data + Result := Binding.Send(aData); + end; + finally + strmData.Free; + end; +end; + +end. diff --git a/IdServerBaseHandling.pas b/IdServerBaseHandling.pas new file mode 100644 index 0000000..4b98afb --- /dev/null +++ b/IdServerBaseHandling.pas @@ -0,0 +1,11 @@ +unit IdServerBaseHandling; + +interface + +type + TIdServerBaseHandling = class + end; + +implementation + +end. diff --git a/IdServerIOHandlerWebsocket.pas b/IdServerIOHandlerWebsocket.pas new file mode 100644 index 0000000..fe10bd1 --- /dev/null +++ b/IdServerIOHandlerWebsocket.pas @@ -0,0 +1,46 @@ +unit IdServerIOHandlerWebsocket; + +interface + +uses + Classes, + IdServerIOHandlerStack, IdIOHandlerStack, IdGlobal, IdIOHandler, IdYarn, IdThread, IdSocketHandle, + IdIOHandlerWebsocket; + +type + TIdServerIOHandlerWebsocket = class(TIdServerIOHandlerStack) + protected + procedure InitComponent; override; + public + function Accept(ASocket: TIdSocketHandle; AListenerThread: TIdThread; + AYarn: TIdYarn): TIdIOHandler; override; + function MakeClientIOHandler(ATheThread:TIdYarn): TIdIOHandler; override; + end; + +implementation + +{ TIdServerIOHandlerStack_Websocket } + +function TIdServerIOHandlerWebsocket.Accept(ASocket: TIdSocketHandle; + AListenerThread: TIdThread; AYarn: TIdYarn): TIdIOHandler; +begin + Result := inherited Accept(ASocket, AListenerThread, AYarn); + if Result <> nil then + (Result as TIdIOHandlerWebsocket).IsServerSide := True; //server must not mask, only client +end; + +procedure TIdServerIOHandlerWebsocket.InitComponent; +begin + inherited InitComponent; + IOHandlerSocketClass := TIdIOHandlerWebsocket; +end; + +function TIdServerIOHandlerWebsocket.MakeClientIOHandler( + ATheThread: TIdYarn): TIdIOHandler; +begin + Result := inherited MakeClientIOHandler(ATheThread); + if Result <> nil then + (Result as TIdIOHandlerWebsocket).IsServerSide := True; //server must not mask, only client +end; + +end. diff --git a/IdServerSocketIOHandling.pas b/IdServerSocketIOHandling.pas new file mode 100644 index 0000000..d9d85dc --- /dev/null +++ b/IdServerSocketIOHandling.pas @@ -0,0 +1,194 @@ +unit IdServerSocketIOHandling; + +interface + +uses + IdContext, IdCustomTCPServer, + //IdServerWebsocketContext, + Classes, Generics.Collections, + superobject, IdException, IdServerBaseHandling, IdSocketIOHandling; + +type + TIdServerSocketIOHandling = class(TIdBaseSocketIOHandling) + protected + procedure ProcessHeatbeatRequest(const AContext: TSocketIOContext; const aText: string); override; + public + function SendToAll(const aMessage: string; const aCallback: TSocketIOMsgJSON = nil): Integer; + procedure SendTo (const aContext: TIdServerContext; const aMessage: string; const aCallback: TSocketIOMsgJSON = nil); + + function EmitEventToAll(const aEventName: string; const aData: ISuperObject; const aCallback: TSocketIOMsgJSON = nil): Integer; + procedure EmitEventTo (const aContext: TSocketIOContext; + const aEventName: string; const aData: ISuperObject; const aCallback: TSocketIOMsgJSON = nil);overload; + procedure EmitEventTo (const aContext: TIdServerContext; + const aEventName: string; const aData: ISuperObject; const aCallback: TSocketIOMsgJSON = nil);overload; + end; + +implementation + +uses + SysUtils, StrUtils; + +{ TIdServerSocketIOHandling } + +procedure TIdServerSocketIOHandling.EmitEventTo( + const aContext: TSocketIOContext; const aEventName: string; + const aData: ISuperObject; const aCallback: TSocketIOMsgJSON); +var + jsonarray: string; +begin + if aContext.IsDisconnected then + raise EIdSocketIoUnhandledMessage.Create('socket.io connection closed!'); + + if aData.IsType(stArray) then + jsonarray := aData.AsString + else if aData.IsType(stString) then + jsonarray := '["' + aData.AsString + '"]' + else + jsonarray := '[' + aData.AsString + ']'; + + if not Assigned(aCallback) then + WriteSocketIOEvent(aContext, ''{no room}, aEventName, jsonarray, nil) + else + WriteSocketIOEventRef(aContext, ''{no room}, aEventName, jsonarray, + procedure(const aData: string) + begin + aCallback(aContext, SO(aData), nil); + end); +end; + +procedure TIdServerSocketIOHandling.EmitEventTo( + const aContext: TIdServerContext; const aEventName: string; + const aData: ISuperObject; const aCallback: TSocketIOMsgJSON); +var + context: TSocketIOContext; +begin + Lock; + try + context := FConnections.Items[aContext]; + EmitEventTo(context, aEventName, aData, aCallback); + finally + UnLock; + end; +end; + +function TIdServerSocketIOHandling.EmitEventToAll(const aEventName: string; const aData: ISuperObject; + const aCallback: TSocketIOMsgJSON): Integer; +var + context: TSocketIOContext; + jsonarray: string; +begin + Result := 0; + if aData.IsType(stArray) then + jsonarray := aData.AsString + else if aData.IsType(stString) then + jsonarray := '["' + aData.AsString + '"]' + else + jsonarray := '[' + aData.AsString + ']'; + + Lock; + try + for context in FConnections.Values do + begin + if context.IsDisconnected then Continue; + + if not Assigned(aCallback) then + WriteSocketIOEvent(context, ''{no room}, aEventName, jsonarray, nil) + else + WriteSocketIOEventRef(context, ''{no room}, aEventName, jsonarray, + procedure(const aData: string) + begin + aCallback(context, SO(aData), nil); + end); + Inc(Result); + end; + for context in FConnectionsGUID.Values do + begin + if context.IsDisconnected then Continue; + + if not Assigned(aCallback) then + WriteSocketIOEvent(context, ''{no room}, aEventName, jsonarray, nil) + else + WriteSocketIOEventRef(context, ''{no room}, aEventName, jsonarray, + procedure(const aData: string) + begin + aCallback(context, SO(aData), nil); + end); + Inc(Result); + end; + finally + UnLock; + end; +end; + +procedure TIdServerSocketIOHandling.ProcessHeatbeatRequest( + const AContext: TSocketIOContext; const aText: string); +begin + inherited ProcessHeatbeatRequest(AContext, aText); +end; + +procedure TIdServerSocketIOHandling.SendTo(const aContext: TIdServerContext; + const aMessage: string; const aCallback: TSocketIOMsgJSON); +var + context: TSocketIOContext; +begin + Lock; + try + context := FConnections.Items[aContext]; + if context.IsDisconnected then + raise EIdSocketIoUnhandledMessage.Create('socket.io connection closed!'); + + if not Assigned(aCallback) then + WriteSocketIOMsg(context, ''{no room}, aMessage, nil) + else + WriteSocketIOMsg(context, ''{no room}, aMessage, + procedure(const aData: string) + begin + aCallback(context, SO(aData), nil); + end); + finally + UnLock; + end; +end; + +function TIdServerSocketIOHandling.SendToAll(const aMessage: string; + const aCallback: TSocketIOMsgJSON): Integer; +var + context: TSocketIOContext; +begin + Result := 0; + Lock; + try + for context in FConnections.Values do + begin + if context.IsDisconnected then Continue; + + if not Assigned(aCallback) then + WriteSocketIOMsg(context, ''{no room}, aMessage, nil) + else + WriteSocketIOMsg(context, ''{no room}, aMessage, + procedure(const aData: string) + begin + aCallback(context, SO(aData), nil); + end); + Inc(Result); + end; + for context in FConnectionsGUID.Values do + begin + if context.IsDisconnected then Continue; + + if not Assigned(aCallback) then + WriteSocketIOMsg(context, ''{no room}, aMessage, nil) + else + WriteSocketIOMsg(context, ''{no room}, aMessage, + procedure(const aData: string) + begin + aCallback(context, SO(aData), nil); + end); + Inc(Result); + end; + finally + UnLock; + end; +end; + +end. diff --git a/IdServerWebsocketContext.pas b/IdServerWebsocketContext.pas new file mode 100644 index 0000000..2aa3142 --- /dev/null +++ b/IdServerWebsocketContext.pas @@ -0,0 +1,81 @@ +unit IdServerWebsocketContext; + +interface + +uses + Classes, + IdCustomTCPServer, IdIOHandlerWebsocket, + IdServerBaseHandling, IdServerSocketIOHandling, IdContext; + +type + TIdServerWSContext = class; + + TWebsocketChannelRequest = procedure(const AContext: TIdServerWSContext; aType: TWSDataType; const strmRequest, strmResponse: TMemoryStream) of object; + + TIdServerWSContext = class(TIdServerContext) + private + FWebSocketKey: string; + FWebSocketVersion: Integer; + FPath: string; + FWebSocketProtocol: string; + FResourceName: string; + FOrigin: string; + FQuery: string; + FHost: string; + FWebSocketExtensions: string; + FCookie: string; + //FSocketIOPingSend: Boolean; + FOnCustomChannelExecute: TWebsocketChannelRequest; + FSocketIO: TIdServerSocketIOHandling; + FOnDestroy: TIdContextEvent; + public + function IOHandler: TIdIOHandlerWebsocket; + public + function IsSocketIO: Boolean; + property SocketIO: TIdServerSocketIOHandling read FSocketIO write FSocketIO; + //property SocketIO: TIdServerBaseHandling read FSocketIO write FSocketIO; + property OnDestroy: TIdContextEvent read FOnDestroy write FOnDestroy; + public + destructor Destroy; override; + + property Path : string read FPath write FPath; + property Query : string read FQuery write FQuery; + property ResourceName: string read FResourceName write FResourceName; + property Host : string read FHost write FHost; + property Origin : string read FOrigin write FOrigin; + property Cookie : string read FCookie write FCookie; + + property WebSocketKey : string read FWebSocketKey write FWebSocketKey; + property WebSocketProtocol : string read FWebSocketProtocol write FWebSocketProtocol; + property WebSocketVersion : Integer read FWebSocketVersion write FWebSocketVersion; + property WebSocketExtensions: string read FWebSocketExtensions write FWebSocketExtensions; + public + property OnCustomChannelExecute: TWebsocketChannelRequest read FOnCustomChannelExecute write FOnCustomChannelExecute; + end; + +implementation + +uses + StrUtils; + +{ TIdServerWSContext } + +destructor TIdServerWSContext.Destroy; +begin + if Assigned(OnDestroy) then + OnDestroy(Self); + inherited; +end; + +function TIdServerWSContext.IOHandler: TIdIOHandlerWebsocket; +begin + Result := Self.Connection.IOHandler as TIdIOHandlerWebsocket; +end; + +function TIdServerWSContext.IsSocketIO: Boolean; +begin + //FDocument = '/socket.io/1/websocket/13412152' + Result := StartsText('/socket.io/1/websocket', FPath); +end; + +end. diff --git a/IdServerWebsocketHandling.pas b/IdServerWebsocketHandling.pas new file mode 100644 index 0000000..cd4aa9b --- /dev/null +++ b/IdServerWebsocketHandling.pas @@ -0,0 +1,317 @@ +unit IdServerWebsocketHandling; + +interface + +uses + IdContext, IdCustomHTTPServer, IdHashSHA1, + IdServerSocketIOHandling, IdServerWebsocketContext, + Classes, IdServerBaseHandling, IdIOHandlerWebsocket; + +type + TIdServerSocketIOHandling_Ext = class(TIdServerSocketIOHandling) + end; + + TIdServerWebsocketHandling = class(TIdServerBaseHandling) + protected + class procedure DoWSExecute(AThread: TIdContext; aSocketIOHandler: TIdServerSocketIOHandling_Ext);virtual; + class procedure HandleWSMessage(AContext: TIdServerWSContext; aType: TWSDataType; + aRequestStrm, aResponseStrm: TMemoryStream; + aSocketIOHandler: TIdServerSocketIOHandling_Ext);virtual; + public + class function ProcessServerCommandGet(AThread: TIdServerWSContext; + ARequestInfo: TIdHTTPRequestInfo; AResponseInfo: TIdHTTPResponseInfo): Boolean; + end; + +implementation + +uses + StrUtils, SysUtils, IdCustomTCPServer, IdCoderMIME; + +{ TIdServerWebsocketHandling } + +class procedure TIdServerWebsocketHandling.DoWSExecute(AThread: TIdContext; aSocketIOHandler: TIdServerSocketIOHandling_Ext); +var + strmRequest, strmResponse: TMemoryStream; + wscode: TWSDataCode; + wstype: TWSDataType; + context: TIdServerWSContext; +begin + context := nil; + try + context := AThread as TIdServerWSContext; + //todo: make seperate function + do it after first real write (not header!) + if context.IOHandler.BusyUpgrading then + begin + context.IOHandler.IsWebsocket := True; + context.IOHandler.BusyUpgrading := False; + end; + //initial connect + if context.IsSocketIO then + begin + Assert(aSocketIOHandler <> nil); + aSocketIOHandler.WriteConnect(context); + end; + //AThread.Connection.Socket.UseNagle := False; + + while AThread.Connection.Connected do + begin + if (AThread.Connection.IOHandler.InputBuffer.Size > 0) or + AThread.Connection.IOHandler.Readable(5 * 1000) then //wait 5s, else ping the client(!) + begin + strmResponse := TMemoryStream.Create; + strmRequest := TMemoryStream.Create; + try + context := AThread as TIdServerWSContext; + + strmRequest.Position := 0; + //first is the type: text or bin + wscode := TWSDataCode(context.IOHandler.ReadLongWord); + //then the length + data = stream + context.IOHandler.ReadStream(strmRequest); + strmRequest.Position := 0; + //ignore ping/pong messages + if wscode in [wdcPing, wdcPong] then + begin + if wscode = wdcPing then + context.IOHandler.WriteData(nil, wdcPong); + Continue; + end; + + if wscode = wdcText then + wstype := wdtText + else + wstype := wdtBinary; + + HandleWSMessage(context, wstype, strmRequest, strmResponse, aSocketIOHandler); + + //write result back (of the same type: text or bin) + if strmResponse.Size > 0 then + begin + if wscode = wdcText then + context.IOHandler.Write(strmResponse, wdtText) + else + context.IOHandler.Write(strmResponse, wdtBinary) + end + else + context.IOHandler.WriteData(nil, wdcPing); + finally + strmRequest.Free; + strmResponse.Free; + end; + end + else + begin + //ping + if context.IsSocketIO then + begin + //context.SocketIOPingSend := True; + Assert(aSocketIOHandler <> nil); + aSocketIOHandler.WritePing(context); + end + else + context.IOHandler.WriteData(nil, wdcPing); + end; + + end; + finally + if context.IsSocketIO then + begin + Assert(aSocketIOHandler <> nil); + aSocketIOHandler.WriteDisConnect(context); + end; + + AThread.Data := nil; + end; +end; + +class procedure TIdServerWebsocketHandling.HandleWSMessage(AContext: TIdServerWSContext; aType: TWSDataType; + aRequestStrm, aResponseStrm: TMemoryStream; + aSocketIOHandler: TIdServerSocketIOHandling_Ext); +begin + if AContext.IsSocketIO then + begin + aRequestStrm.Position := 0; + Assert(aSocketIOHandler <> nil); + aSocketIOHandler.ProcessSocketIORequest(AContext, aRequestStrm); + end + else if Assigned(AContext.OnCustomChannelExecute) then + AContext.OnCustomChannelExecute(AContext, aType, aRequestStrm, aResponseStrm); +end; + +class function TIdServerWebsocketHandling.ProcessServerCommandGet( + AThread: TIdServerWSContext; ARequestInfo: TIdHTTPRequestInfo; + AResponseInfo: TIdHTTPResponseInfo): Boolean; +var + sValue, squid: string; + context: TIdServerWSContext; + hash: TIdHashSHA1; + guid: TGUID; +begin + (* GET /chat HTTP/1.1 + Host: server.example.com + Upgrade: websocket + Connection: Upgrade + Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ== + Origin: http://example.com + Sec-WebSocket-Protocol: chat, superchat + Sec-WebSocket-Version: 13 *) + + (* GET ws://echo.websocket.org/?encoding=text HTTP/1.1 + Origin: http://websocket.org + Cookie: __utma=99as + Connection: Upgrade + Host: echo.websocket.org + Sec-WebSocket-Key: uRovscZjNol/umbTt5uKmw== + Upgrade: websocket + Sec-WebSocket-Version: 13 *) + + //Connection: Upgrade + if not SameText('Upgrade', ARequestInfo.Connection) then + begin + //initiele ondersteuning voor socket.io + if SameText(ARequestInfo.document , '/socket.io/1/') then + begin + { + https://github.com/LearnBoost/socket.io-spec + The client will perform an initial HTTP POST request like the following + http://example.com/socket.io/1/ + 200: The handshake was successful. + The body of the response should contain the session id (sid) given to the client, followed by the heartbeat timeout, the connection closing timeout, and the list of supported transports separated by : + The absence of a heartbeat timeout ('') is interpreted as the server and client not expecting heartbeats. + For example 4d4f185e96a7b:15:10:websocket,xhr-polling. + } + AResponseInfo.ResponseNo := 200; + AResponseInfo.ResponseText := 'Socket.io connect OK'; + + CreateGUID(guid); + squid := GUIDToString(guid); + AResponseInfo.ContentText := squid + + ':15:10:websocket,xhr-polling'; + AResponseInfo.CloseConnection := False; + //(AThread.SocketIO as TIdServerSocketIOHandling_Ext).NewConnection(AThread); + (AThread.SocketIO as TIdServerSocketIOHandling_Ext).NewConnection(squid, AThread.Binding.PeerIP); + + Result := True; //handled + end + //'/socket.io/1/xhr-polling/2129478544' + else if StartsText('/socket.io/1/xhr-polling/', ARequestInfo.document) then + begin + AResponseInfo.ContentStream := TMemoryStream.Create; + AResponseInfo.CloseConnection := False; + + squid := Copy(ARequestInfo.Document, 1 + Length('/socket.io/1/xhr-polling/'), Length(ARequestInfo.document)); + if ARequestInfo.CommandType = hcGET then + (AThread.SocketIO as TIdServerSocketIOHandling_Ext) + .ProcessSocketIO_XHR(squid, ARequestInfo.PostStream, AResponseInfo.ContentStream) + else if ARequestInfo.CommandType = hcPOST then + (AThread.SocketIO as TIdServerSocketIOHandling_Ext) + .ProcessSocketIO_XHR(squid, ARequestInfo.PostStream, nil); //no response expected with POST! + Result := True; //handled + end + else + Result := False; //NOT handled + end + else + begin + Result := True; //handled + context := AThread as TIdServerWSContext; + + //Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ== + sValue := ARequestInfo.RawHeaders.Values['sec-websocket-key']; + //"The value of this header field MUST be a nonce consisting of a randomly + // selected 16-byte value that has been base64-encoded" + if (sValue <> '') then + begin + if (Length(TIdDecoderMIME.DecodeString(sValue)) = 16) then + context.WebSocketKey := sValue + else + Abort; //invalid length + end + else + //important: key must exists, otherwise stop! + Abort; + + (* + ws-URI = "ws:" "//" host [ ":" port ] path [ "?" query ] + wss-URI = "wss:" "//" host [ ":" port ] path [ "?" query ] + 2. The method of the request MUST be GET, and the HTTP version MUST be at least 1.1. + For example, if the WebSocket URI is "ws://example.com/chat", + the first line sent should be "GET /chat HTTP/1.1". + 3. The "Request-URI" part of the request MUST match the /resource + name/ defined in Section 3 (a relative URI) or be an absolute + http/https URI that, when parsed, has a /resource name/, /host/, + and /port/ that match the corresponding ws/wss URI. + *) + context.ResourceName := ARequestInfo.Document; + if ARequestInfo.UnparsedParams <> '' then + context.ResourceName := context.ResourceName + '?' + + ARequestInfo.UnparsedParams; + //seperate parts + context.Path := ARequestInfo.Document; + context.Query := ARequestInfo.UnparsedParams; + + //Host: server.example.com + context.Host := ARequestInfo.RawHeaders.Values['host']; + //Origin: http://example.com + context.Origin := ARequestInfo.RawHeaders.Values['origin']; + //Cookie: __utma=99as + context.Cookie := ARequestInfo.RawHeaders.Values['cookie']; + + //Sec-WebSocket-Version: 13 + //"The value of this header field MUST be 13" + sValue := ARequestInfo.RawHeaders.Values['sec-websocket-version']; + if (sValue <> '') then + begin + context.WebSocketVersion := StrToIntDef(sValue, 0); + if context.WebSocketVersion < 13 then + Abort; //must be at least 13 + end + else + Abort; //must exist + + context.WebSocketProtocol := ARequestInfo.RawHeaders.Values['sec-websocket-protocol']; + context.WebSocketExtensions := ARequestInfo.RawHeaders.Values['sec-websocket-extensions']; + + //Response + (* HTTP/1.1 101 Switching Protocols + Upgrade: websocket + Connection: Upgrade + Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo= *) + AResponseInfo.ResponseNo := 101; + AResponseInfo.ResponseText := 'Switching Protocols'; + AResponseInfo.CloseConnection := False; + //Connection: Upgrade + AResponseInfo.Connection := 'Upgrade'; + //Upgrade: websocket + AResponseInfo.CustomHeaders.Values['Upgrade'] := 'websocket'; + + //Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo= + sValue := Trim(context.WebSocketKey) + //... "minus any leading and trailing whitespace" + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'; //special GUID + hash := TIdHashSHA1.Create; + try + sValue := TIdEncoderMIME.EncodeBytes( //Base64 + hash.HashString(sValue) ); //SHA1 + finally + hash.Free; + end; + AResponseInfo.CustomHeaders.Values['Sec-WebSocket-Accept'] := sValue; + + //send same protocol back? + AResponseInfo.CustomHeaders.Values['Sec-WebSocket-Protocol'] := context.WebSocketProtocol; + //we do not support extensions yet (gzip deflate compression etc) + //AResponseInfo.CustomHeaders.Values['Sec-WebSocket-Extensions'] := context.WebSocketExtensions; + //http://www.lenholgate.com/blog/2011/07/websockets---the-deflate-stream-extension-is-broken-and-badly-designed.html + //but is could be done using idZlib.pas and DecompressGZipStream etc + + //send response back + context.IOHandler.InputBuffer.Clear; + context.IOHandler.BusyUpgrading := True; + AResponseInfo.WriteHeader; + + //handle all WS communication in seperate loop + DoWSExecute(AThread, (context.SocketIO as TIdServerSocketIOHandling_Ext) ); + end; +end; + +end. diff --git a/IdSocketIOHandling.pas b/IdSocketIOHandling.pas new file mode 100644 index 0000000..1472b23 --- /dev/null +++ b/IdSocketIOHandling.pas @@ -0,0 +1,1239 @@ +unit IdSocketIOHandling; + +interface + +uses + Classes, Generics.Collections, + superobject, + IdServerBaseHandling, IdContext, IdException, IdIOHandlerWebsocket, IdHTTP, + SyncObjs; + +type + TSocketIOContext = class; + TSocketIOCallbackObj = class; + TIdBaseSocketIOHandling = class; + TIdSocketIOHandling = class; + ISocketIOContext = interface; + + TSocketIOMsg = reference to procedure(const ASocket: ISocketIOContext; const aText: string; const aCallback: TSocketIOCallbackObj); + TSocketIOMsgJSON = reference to procedure(const ASocket: ISocketIOContext; const aJSON: ISuperObject; const aCallback: TSocketIOCallbackObj); + TSocketIONotify = reference to procedure(const ASocket: ISocketIOContext); + TSocketIOEvent = reference to procedure(const ASocket: ISocketIOContext; const aArgument: TSuperArray; const aCallbackObj: TSocketIOCallbackObj); + + TSocketIONotifyList = class(TList); + TSocketIOEventList = class(TList); + + EIdSocketIoUnhandledMessage = class(EIdSilentException); + + ISocketIOContext = interface + ['{ACCAC678-054C-4D75-8BAD-5922F55623AB}'] + function ResourceName: string; + function PeerIP: string; + function PeerPort: Integer; + + procedure EmitEvent(const aEventName: string; const aData: ISuperObject; const aCallback: TSocketIOMsgJSON = nil); + procedure Send(const aData: string; const aCallback: TSocketIOMsgJSON = nil); + procedure SendJSON(const aJSON: ISuperObject; const aCallback: TSocketIOMsgJSON = nil); + end; + + TSocketIOContext = class(TInterfacedObject, + ISocketIOContext) + private + FPingSend: Boolean; + FConnectSend: Boolean; + FGUID: string; + FPeerIP: string; + procedure SetContext(const Value: TIdContext); + procedure SetConnectSend(const Value: Boolean); + procedure SetPingSend(const Value: Boolean); + protected + FHandling: TIdBaseSocketIOHandling; + FContext: TIdContext; + FIOHandler: TIdIOHandlerWebsocket; + FClient: TIdHTTP; + FEvent: TEvent; + FQueue: TList; + procedure QueueData(const aData: string); + procedure ServerContextDestroy(AContext: TIdContext); + public + constructor Create();overload; + constructor Create(aClient: TIdHTTP);overload; + destructor Destroy; override; + + procedure Lock; + procedure UnLock; + function WaitForQueue(aTimeout_ms: Integer): string; + + function ResourceName: string; + function PeerIP: string; + function PeerPort: Integer; + + property GUID: string read FGUID; + property Context: TIdContext read FContext write SetContext; + property PingSend: Boolean read FPingSend write SetPingSend; + property ConnectSend: Boolean read FConnectSend write SetConnectSend; + + function IsDisconnected: Boolean; + + //todo: OnEvent per socket + //todo: store session info per connection (see Socket.IO Set + Get -> Storing data associated to a client) + //todo: namespace using "Of" + procedure EmitEvent(const aEventName: string; const aData: ISuperObject; const aCallback: TSocketIOMsgJSON = nil); +// procedure BroadcastEventToOthers(const aEventName: string; const aData: ISuperObject; const aCallback: TSocketIOMsgJSON = nil); + procedure Send(const aData: string; const aCallback: TSocketIOMsgJSON = nil); + procedure SendJSON(const aJSON: ISuperObject; const aCallback: TSocketIOMsgJSON = nil); + end; + + TSocketIOCallbackObj = class + protected + FHandling: TIdBaseSocketIOHandling; + FSocket: TSocketIOContext; + FMsgNr: Integer; + public + procedure SendResponse(const aResponse: string); + end; + + TIdBaseSocketIOHandling = class(TIdServerBaseHandling) + protected + FConnections: TObjectDictionary; + FConnectionsGUID: TObjectDictionary; + + FOnConnectionList, + FOnDisconnectList: TSocketIONotifyList; + FOnEventList: TObjectDictionary; + FOnSocketIOMsg: TSocketIOMsg; + FOnSocketIOJson: TSocketIOMsgJSON; + + procedure ProcessEvent(const AContext: TSocketIOContext; const aText: string; aMsgNr: Integer; aHasCallback: Boolean); + protected + type + TSocketIOCallback = procedure(const aData: string) of object; + TSocketIOCallbackRef = reference to procedure(const aData: string); + var + FSocketIOMsgNr: Integer; + FSocketIOEventCallback: TDictionary; + FSocketIOEventCallbackRef: TDictionary; + + function WriteConnect(const ASocket: TSocketIOContext): string; overload; + procedure WriteDisConnect(const ASocket: TSocketIOContext);overload; + procedure WritePing(const ASocket: TSocketIOContext);overload; + // + function WriteConnect(const AContext: TIdContext): string; overload; + procedure WriteDisConnect(const AContext: TIdContext);overload; + procedure WritePing(const AContext: TIdContext);overload; + + procedure WriteSocketIOMsg(const ASocket: TSocketIOContext; const aRoom, aData: string; aCallback: TSocketIOCallbackRef = nil); + procedure WriteSocketIOJSON(const ASocket: TSocketIOContext; const aRoom, aJSON: string; aCallback: TSocketIOCallbackRef = nil); + procedure WriteSocketIOEvent(const ASocket: TSocketIOContext; const aRoom, aEventName, aJSONArray: string; aCallback: TSocketIOCallback); + procedure WriteSocketIOEventRef(const ASocket: TSocketIOContext; const aRoom, aEventName, aJSONArray: string; aCallback: TSocketIOCallbackRef); + procedure WriteSocketIOResult(const ASocket: TSocketIOContext; aRequestMsgNr: Integer; const aRoom, aData: string); + + procedure ProcessSocketIO_XHR(const aGUID: string; const aStrmRequest, aStrmResponse: TStream); + + procedure ProcessSocketIORequest(const ASocket: TSocketIOContext; const strmRequest: TMemoryStream);overload; + procedure ProcessSocketIORequest(const ASocket: TSocketIOContext; const aData: string);overload; + procedure ProcessSocketIORequest(const AContext: TIdContext; const strmRequest: TMemoryStream);overload; + + procedure ProcessHeatbeatRequest(const ASocket: TSocketIOContext; const aText: string);virtual; + procedure ProcessCloseChannel(const ASocket: TSocketIOContext; const aChannel: string);virtual; + function WriteString(const ASocket: TSocketIOContext; const aText: string): string; virtual; + public + procedure AfterConstruction; override; + destructor Destroy; override; + + procedure Lock; + procedure UnLock; + +// procedure EmitEventToAll(const aEventName: string; const aData: ISuperObject; const aCallback: TSocketIOMsgJSON = nil); + function NewConnection(const AContext: TIdContext): TSocketIOContext;overload; + function NewConnection(const aGUID, aPeerIP: string): TSocketIOContext;overload; + procedure FreeConnection(const AContext: TIdContext);overload; + procedure FreeConnection(const ASocket: TSocketIOContext);overload; + + property OnSocketIOMsg : TSocketIOMsg read FOnSocketIOMsg write FOnSocketIOMsg; + property OnSocketIOJson : TSocketIOMsgJSON read FOnSocketIOJson write FOnSocketIOJson; + + procedure OnEvent (const aEventName: string; const aCallback: TSocketIOEvent); + procedure OnConnection(const aCallback: TSocketIONotify); + procedure OnDisconnect(const aCallback: TSocketIONotify); + end; + + TIdSocketIOHandling = class(TIdBaseSocketIOHandling) + public + procedure Send(const aMessage: string; const aCallback: TSocketIOMsgJSON = nil); + procedure Emit(const aEventName: string; const aData: ISuperObject; const aCallback: TSocketIOMsgJSON = nil); + end; + +implementation + +uses + SysUtils, StrUtils, IdServerWebsocketContext, IdHTTPWebsocketClient, Windows; + +procedure TIdBaseSocketIOHandling.AfterConstruction; +begin + inherited; + FConnections := TObjectDictionary.Create([doOwnsValues]); + FConnectionsGUID := TObjectDictionary.Create([doOwnsValues]); + + FOnConnectionList := TSocketIONotifyList.Create; + FOnDisconnectList := TSocketIONotifyList.Create; + FOnEventList := TObjectDictionary.Create([doOwnsValues]); + + FSocketIOEventCallback := TDictionary.Create; + FSocketIOEventCallbackRef := TDictionary.Create; +end; + +destructor TIdBaseSocketIOHandling.Destroy; +var squid: string; + idcontext: TIdContext; +begin + FSocketIOEventCallback.Free; + FSocketIOEventCallbackRef.Free; + + FOnEventList.Free; + FOnConnectionList.Free; + FOnDisconnectList.Free; + + while FConnections.Count > 0 do + for idcontext in FConnections.Keys do + begin + FConnections.Items[idcontext]._Release; + FConnections.ExtractPair(idcontext); + end; + while FConnectionsGUID.Count > 0 do + for squid in FConnectionsGUID.Keys do + begin + FConnectionsGUID.Items[squid]._Release; + FConnectionsGUID.ExtractPair(squid); + end; + FConnections.Free; + FConnectionsGUID.Free; + + inherited; +end; + +procedure TIdBaseSocketIOHandling.FreeConnection( + const ASocket: TSocketIOContext); +var squid: string; + idcontext: TIdContext; +begin + if ASocket = nil then Exit; + + ASocket.Context := nil; + ASocket.FIOHandler := nil; + ASocket.FClient := nil; + ASocket.FHandling := nil; + ASocket.FGUID := ''; + ASocket.FPeerIP := ''; + + for idcontext in FConnections.Keys do + begin + if FConnections.Items[idcontext] = ASocket then + begin + FConnections.ExtractPair(idcontext); + ASocket._Release; + end; + end; + for squid in FConnectionsGUID.Keys do + begin + if FConnectionsGUID.Items[squid] = ASocket then + begin + FConnectionsGUID.ExtractPair(squid); + ASocket._Release; //use reference count? otherwise AV when used in TThread.Queue + end; + end; +end; + +procedure TIdBaseSocketIOHandling.FreeConnection(const AContext: TIdContext); +var + socket: TSocketIOContext; +begin + Lock; + try + if FConnections.TryGetValue(AContext, socket) then + FreeConnection(socket); + finally + UnLock; + end; +end; + +procedure TIdBaseSocketIOHandling.Lock; +begin + System.TMonitor.Enter(Self); +end; + +function TIdBaseSocketIOHandling.NewConnection( + const AGUID, aPeerIP: string): TSocketIOContext; +var + socket: TSocketIOContext; +begin + Lock; + try + if not FConnectionsGUID.TryGetValue(AGUID, socket) then + begin + socket := TSocketIOContext.Create; + socket._AddRef; + FConnectionsGUID.Add(AGUID, socket); + end; + //socket.Context := AContext; + socket.FGUID := AGUID; + if aPeerIP <> '' then + socket.FPeerIP := aPeerIP; + socket.FHandling := Self; + socket.FConnectSend := False; + socket.FPingSend := False; + Result := socket; + finally + UnLock; + end; +end; + +function TIdBaseSocketIOHandling.NewConnection(const AContext: TIdContext): TSocketIOContext; +var + socket: TSocketIOContext; +begin + Lock; + try + if not FConnections.TryGetValue(AContext, socket) then + begin + socket := TSocketIOContext.Create; + socket._AddRef; + FConnections.Add(AContext, socket); + end; + socket.Context := AContext; + socket.FHandling := Self; + socket.FConnectSend := False; + socket.FPingSend := False; + Result := socket; + finally + UnLock; + end; +end; + +procedure TIdBaseSocketIOHandling.OnConnection(const aCallback: TSocketIONotify); +var context: TSocketIOContext; +begin + FOnConnectionList.Add(aCallback); + + Lock; + try + for context in FConnections.Values do + aCallback(context); + for context in FConnectionsGUID.Values do + aCallback(context); + finally + UnLock; + end; +end; + +procedure TIdBaseSocketIOHandling.OnDisconnect(const aCallback: TSocketIONotify); +begin + FOnDisconnectList.Add(aCallback); +end; + +procedure TIdBaseSocketIOHandling.OnEvent(const aEventName: string; + const aCallback: TSocketIOEvent); +var list: TSocketIOEventList; +begin + if not FOnEventList.TryGetValue(aEventName, list) then + begin + list := TSocketIOEventList.Create; + FOnEventList.Add(aEventName, list); + end; + list.Add(aCallback); +end; + +procedure TIdBaseSocketIOHandling.ProcessCloseChannel( + const ASocket: TSocketIOContext; const aChannel: string); +begin + if aChannel <> '' then + //todo: close channel + else + ASocket.FContext.Connection.Disconnect; +end; + +procedure TIdBaseSocketIOHandling.ProcessEvent( + const AContext: TSocketIOContext; const aText: string; aMsgNr: Integer; + aHasCallback: Boolean); +var + json: ISuperObject; + name: string; + args: TSuperArray; + list: TSocketIOEventList; + event: TSocketIOEvent; + callback: TSocketIOCallbackObj; +// socket: TSocketIOContext; +begin + //'5:' [message id ('+')] ':' [message endpoint] ':' [json encoded event] + //5::/chat:{"name":"my other event","args":[{"my":"data"}]} + //5:1+:/chat:{"name":"GetLocations","args":[""]} + json := SO(aText); +// args := nil; + try + name := json.S['name']; //"my other event + args := json.A['args']; //[{"my":"data"}] + + if FOnEventList.TryGetValue(name, list) then + begin + if list.Count = 0 then + raise EIdSocketIoUnhandledMessage.Create(aText); + +// socket := FConnections.Items[AContext]; + if aHasCallback then + begin + callback := TSocketIOCallbackObj.Create; + callback.FHandling := Self; + callback.FSocket := AContext; + callback.FMsgNr := aMsgNr; + end + else + callback := nil; + try + try + for event in list do + event(AContext, args, callback); + except + on E:Exception do + begin + if callback <> nil then + callback.SendResponse( SO(['Error', e.Message]).AsJSon ); + end; + end; + finally + callback.Free; + end; + end + else + raise EIdSocketIoUnhandledMessage.Create(aText); + finally +// args.Free; + json := nil; + end; +end; + +procedure TIdBaseSocketIOHandling.ProcessHeatbeatRequest(const ASocket: TSocketIOContext; const aText: string); +begin + if ASocket.PingSend then + ASocket.PingSend := False //reset, client responded with 2:: heartbeat too + else + begin + ASocket.PingSend := True; //stop infinite ping response loops + WriteString(ASocket, aText); //write same connect back, e.g. 2:: + end; +end; + +procedure TIdBaseSocketIOHandling.ProcessSocketIORequest( + const ASocket: TSocketIOContext; const strmRequest: TMemoryStream); + + function __ReadToEnd: string; + var + utf8: TBytes; + ilength: Integer; + begin + Result := ''; + ilength := strmRequest.Size - strmRequest.Position; + SetLength(utf8, ilength); + strmRequest.Read(utf8[0], ilength); + Result := TEncoding.UTF8.GetString(utf8); + end; + +var str: string; +begin + str := __ReadToEnd; + ProcessSocketIORequest(ASocket, str); +end; + +procedure TIdBaseSocketIOHandling.ProcessSocketIORequest(const AContext: TIdContext; + const strmRequest: TMemoryStream); +var + socket: TSocketIOContext; +begin + if not FConnections.TryGetValue(AContext, socket) then + begin + socket := NewConnection(AContext); + end; + ProcessSocketIORequest(socket, strmRequest); +end; + +procedure TIdBaseSocketIOHandling.ProcessSocketIO_XHR(const aGUID: string; // const AContext: TIdContext; + const aStrmRequest, aStrmResponse: TStream); +var + socket: TSocketIOContext; + sdata: string; + i, ilength: Integer; + bytes, singlemsg: TBytes; +begin + if not FConnectionsGUID.TryGetValue(aGUID, socket) or + socket.IsDisconnected + then + socket := NewConnection(aGUID, ''); + + if not socket.FConnectSend then + WriteConnect(socket); + + if (aStrmRequest <> nil) and + (aStrmRequest.Size > 0) then + begin + aStrmRequest.Position := 0; + SetLength(bytes, aStrmRequest.Size); + aStrmRequest.Read(bytes[0], aStrmRequest.Size); + + if (Length(bytes) > 3) and + (bytes[0] = 239) and (bytes[1] = 191) and (bytes[2] = 189) then + begin + //io.parser.encodePayload(msgs) + //'\ufffd' + packet.length + '\ufffd' + //'�17�3:::singlemessage�52�5:4+::{"name":"registerScanner","args":["scanner1"]}' + while bytes <> nil do + begin + i := 3; + //search second '\ufffd' + while not ( (bytes[i+0] = 239) and (bytes[i+1] = 191) and (bytes[i+2] = 189) ) do + begin + Inc(i); + if i+2 > High(bytes) then Exit; //wrong data + end; + //get data between + ilength := StrToInt( TEncoding.UTF8.GetString(bytes, 3, i-3) ); //17 + + singlemsg := Copy(bytes, i+3, ilength); + bytes := Copy(bytes, i+3 + ilength, Length(bytes)); + sdata := TEncoding.UTF8.GetString(singlemsg); //3:::singlemessage + try + ProcessSocketIORequest(socket, sdata); + except + //next + end; + end; + end + else + begin + sdata := TEncoding.UTF8.GetString(bytes); + ProcessSocketIORequest(socket, sdata); + end; + end; + + //e.g. POST, no GET? + if aStrmResponse = nil then Exit; + + //wait till some response data to be send (long polling) + sdata := socket.WaitForQueue(5 * 1000); + if sdata = '' then + begin + //no data? then send ping + WritePing(socket); + sdata := socket.WaitForQueue(0); + end; + //send response back + if sdata <> '' then + begin + {$WARN SYMBOL_PLATFORM OFF} + if DebugHook <> 0 then + Windows.OutputDebugString(PChar('Send: ' + sdata)); + + bytes := TEncoding.UTF8.GetBytes(sdata); + aStrmResponse.Write(bytes[0], Length(bytes)); + end; +end; + +procedure TIdBaseSocketIOHandling.UnLock; +begin + System.TMonitor.Exit(Self); +end; + +procedure TIdBaseSocketIOHandling.ProcessSocketIORequest( + const ASocket: TSocketIOContext; const aData: string); + + function __GetSocketIOPart(const aData: string; aIndex: Integer): string; + var ipos: Integer; + i: Integer; + begin + //'5::/chat:{"name":"hi!"}' + //0 = 5 + //1 = + //2 = /chat + //3 = {"name":"hi!"} + ipos := 0; + for i := 0 to aIndex-1 do + ipos := PosEx(':', aData, ipos+1); + if ipos >= 0 then + begin + Result := Copy(aData, ipos+1, Length(aData)); + if aIndex < 3 then // /chat:{"name":"hi!"}' + begin + ipos := PosEx(':', Result, 1); // :{"name":"hi!"}' + if ipos > 0 then + Result := Copy(Result, 1, ipos-1); // /chat + end; + end; + end; + +var + str, smsg, schannel, sdata: string; + imsg: Integer; + bCallback: Boolean; +// socket: TSocketIOContext; + callback: TSocketIOCallback; + callbackref: TSocketIOCallbackRef; + callbackobj: TSocketIOCallbackObj; +begin + if not FConnections.ContainsValue(ASocket) and + not FConnectionsGUID.ContainsValue(ASocket) then + begin + Lock; + try + ASocket._AddRef; + FConnections.Add(nil, ASocket); //clients do not have a TIdContext? + finally + UnLock; + end; + end; + + str := aData; + if str = '' then Exit; + if DebugHook <> 0 then + Windows.OutputDebugString(PChar('Received: ' + str)); + + //5:1+:/chat:test + smsg := __GetSocketIOPart(str, 1); + imsg := 0; + bCallback := False; + if smsg <> '' then // 1+ + begin + imsg := StrToIntDef(ReplaceStr(smsg,'+',''), 0); // 1 + bCallback := (Pos('+', smsg) > 1); //trailing +, e.g. 1+ + end; + schannel := __GetSocketIOPart(str, 2); // /chat + sdata := __GetSocketIOPart(str, 3); // test + + //(0) Disconnect + if StartsStr('0:', str) then + begin + schannel := __GetSocketIOPart(str, 2); + ProcessCloseChannel(ASocket, schannel); + end + //(1) Connect + //'1::' [path] [query] + else if StartsStr('1:', str) then + begin + //todo: add channel/room to authorized channel/room list + if not ASocket.ConnectSend then + WriteString(ASocket, str); //write same connect back, e.g. 1::/chat + end + //(2) Heartbeat + else if StartsStr('2:', str) then + begin + //todo: timer to disconnect client if no ping within time + ProcessHeatbeatRequest(ASocket, str); + end + //(3) Message (https://github.com/LearnBoost/socket.io-spec#3-message) + //'3:' [message id ('+')] ':' [message endpoint] ':' [data] + //3::/chat:hi + else if StartsStr('3:', str) then + begin + if Assigned(OnSocketIOMsg) then + begin + if bCallback then + begin + callbackobj := TSocketIOCallbackObj.Create; + try + callbackobj.FHandling := Self; + callbackobj.FSocket := ASocket; + callbackobj.FMsgNr := imsg; + OnSocketIOMsg(ASocket, sdata, callbackobj); //, imsg, bCallback); + finally + callbackobj.Free; + end + end + else + OnSocketIOMsg(ASocket, sdata, nil); + end + else + raise EIdSocketIoUnhandledMessage.Create(str); + end + //(4) JSON Message + //'4:' [message id ('+')] ':' [message endpoint] ':' [json] + //4:1::{"a":"b"} + else if StartsStr('4:', str) then + begin + if Assigned(OnSocketIOJson) then + begin + if bCallback then + begin + callbackobj := TSocketIOCallbackObj.Create; + try + callbackobj.FHandling := Self; + callbackobj.FSocket := ASocket; + callbackobj.FMsgNr := imsg; + OnSocketIOJson(ASocket, SO(sdata), callbackobj); //, imsg, bCallback); + finally + callbackobj.Free; + end + end + else + OnSocketIOJson(ASocket, SO(sdata), nil); //, imsg, bCallback); + end + else + raise EIdSocketIoUnhandledMessage.Create(str); + end + //(5) Event + //'5:' [message id ('+')] ':' [message endpoint] ':' [json encoded event] + //5::/chat:{"name":"my other event","args":[{"my":"data"}]} + //5:1+:/chat:{"name":"GetLocations","args":[""]} + else if StartsStr('5:', str) then + begin + //if Assigned(OnSocketIOEvent) then + // OnSocketIOEvent(AContext, sdata, imsg, bCallback); + try + ProcessEvent(ASocket, sdata, imsg, bCallback); + except + on e:exception do + // + end + end + //(6) ACK + //6::/news:1+["callback"] + //6:::1+["Response"] + else if StartsStr('6:', str) then + begin + smsg := Copy(sdata, 1, Pos('+', sData)-1); + imsg := StrToIntDef(smsg, 0); + sData := Copy(sdata, Pos('+', sData)+1, Length(sData)); + + if FSocketIOEventCallback.TryGetValue(imsg, callback) then + begin + FSocketIOEventCallback.Remove(imsg); + callback(sdata); + end + else if FSocketIOEventCallbackRef.TryGetValue(imsg, callbackref) then + begin + FSocketIOEventCallbackRef.Remove(imsg); + callbackref(sdata); + end + else ; + //raise EIdSocketIoUnhandledMessage.Create(str); + end + //(7) Error + else if StartsStr('7:', str) then + raise EIdSocketIoUnhandledMessage.Create(str) + //(8) Noop + else if StartsStr('8:', str) then + begin + //nothing + end + else + raise Exception.CreateFmt('Unsupported data: "%s"', [str]); +end; + +function TIdBaseSocketIOHandling.WriteConnect( + const ASocket: TSocketIOContext): string; +var + notify: TSocketIONotify; +begin + Lock; + try + if not FConnections.ContainsValue(ASocket) and + not FConnectionsGUID.ContainsValue(ASocket) then + begin + ASocket._AddRef; + FConnections.Add(nil, ASocket); //clients do not have a TIdContext? + end; + + if not ASocket.ConnectSend then + begin + ASocket.ConnectSend := True; + Result := WriteString(ASocket, '1::'); + end; + finally + UnLock; + end; + + for notify in FOnConnectionList do + notify(ASocket); +end; + +procedure TIdBaseSocketIOHandling.WriteDisConnect( + const ASocket: TSocketIOContext); +var + notify: TSocketIONotify; +begin + if ASocket = nil then Exit; + for notify in FOnDisconnectList do + notify(ASocket); + + Lock; + try + if not ASocket.IsDisconnected then + try + WriteString(ASocket, '0::'); + except + end; + FreeConnection(ASocket); + finally + UnLock; + end; +end; + +procedure TIdBaseSocketIOHandling.WritePing( + const ASocket: TSocketIOContext); +begin + ASocket.PingSend := True; + WriteString(ASocket, '2::') //heartbeat: https://github.com/LearnBoost/socket.io-spec +end; + +procedure TIdBaseSocketIOHandling.WriteSocketIOEvent(const ASocket: TSocketIOContext; const aRoom, aEventName, + aJSONArray: string; aCallback: TSocketIOCallback); +var + sresult: string; +begin + //see TROIndyHTTPWebsocketServer.ProcessSocketIORequest too + //5:1+:/chat:{"name":"GetLocations","args":[""]} + Inc(FSocketIOMsgNr); + if not Assigned(aCallback) then + sresult := Format('5:%d:%s:{"name":"%s", "args":%s}', + [FSocketIOMsgNr, aRoom, aEventName, aJSONArray]) + else + begin + if FSocketIOEventCallback = nil then + FSocketIOEventCallback := TDictionary.Create; + sresult := Format('5:%d+:%s:{"name":"%s", "args":%s}', + [FSocketIOMsgNr, aRoom, aEventName, aJSONArray]); + FSocketIOEventCallback.Add(FSocketIOMsgNr, aCallback); + end; + WriteString(ASocket, sresult); +end; + +procedure TIdBaseSocketIOHandling.WriteSocketIOEventRef(const ASocket: TSocketIOContext; + const aRoom, aEventName, aJSONArray: string; aCallback: TSocketIOCallbackRef); +var + sresult: string; +begin + //see TROIndyHTTPWebsocketServer.ProcessSocketIORequest too + //5:1+:/chat:{"name":"GetLocations","args":[""]} + Inc(FSocketIOMsgNr); + if not Assigned(aCallback) then + sresult := Format('5:%d:%s:{"name":"%s", "args":%s}', + [FSocketIOMsgNr, aRoom, aEventName, aJSONArray]) + else + begin + if FSocketIOEventCallbackRef = nil then + FSocketIOEventCallbackRef := TDictionary.Create; + sresult := Format('5:%d+:%s:{"name":"%s", "args":%s}', + [FSocketIOMsgNr, aRoom, aEventName, aJSONArray]); + FSocketIOEventCallbackRef.Add(FSocketIOMsgNr, aCallback); + end; + WriteString(ASocket, sresult); +end; + +procedure TIdBaseSocketIOHandling.WriteSocketIOJSON(const ASocket: TSocketIOContext; + const aRoom, aJSON: string; aCallback: TSocketIOCallbackRef = nil); +var + sresult: string; +begin + //see TROIndyHTTPWebsocketServer.ProcessSocketIORequest too + //4:1::{"a":"b"} + Inc(FSocketIOMsgNr); + + if not Assigned(aCallback) then + sresult := Format('4:%d:%s:%s', [FSocketIOMsgNr, aRoom, aJSON]) + else + begin + if FSocketIOEventCallbackRef = nil then + FSocketIOEventCallbackRef := TDictionary.Create; + sresult := Format('4:%d+:%s:%s', + [FSocketIOMsgNr, aRoom, aJSON]); + FSocketIOEventCallbackRef.Add(FSocketIOMsgNr, aCallback); + end; + + WriteString(ASocket, sresult); +end; + +procedure TIdBaseSocketIOHandling.WriteSocketIOMsg(const ASocket: TSocketIOContext; + const aRoom, aData: string; aCallback: TSocketIOCallbackRef = nil); +var + sresult: string; +begin + //see TROIndyHTTPWebsocketServer.ProcessSocketIORequest too + //3::/chat:hi + Inc(FSocketIOMsgNr); + + if not Assigned(aCallback) then + sresult := Format('3:%d:%s:%s', [FSocketIOMsgNr, aRoom, aData]) + else + begin + if FSocketIOEventCallbackRef = nil then + FSocketIOEventCallbackRef := TDictionary.Create; + sresult := Format('3:%d+:%s:%s', + [FSocketIOMsgNr, aRoom, aData]); + FSocketIOEventCallbackRef.Add(FSocketIOMsgNr, aCallback); + end; + + WriteString(ASocket, sresult); +end; + +procedure TIdBaseSocketIOHandling.WriteSocketIOResult(const ASocket: TSocketIOContext; + aRequestMsgNr: Integer; const aRoom, aData: string); +var + sresult: string; +begin + //6::/news:2+["callback"] + sresult := Format('6::%s:%d+[%s]', [aRoom, aRequestMsgNr, aData]); + WriteString(ASocket, sresult); +end; + +function TIdBaseSocketIOHandling.WriteString(const ASocket: TSocketIOContext; + const aText: string): string; +begin + if ASocket = nil then Exit; + + ASocket.Lock; + try + if ASocket.FIOHandler = nil then + begin + if ASocket.FContext <> nil then + ASocket.FIOHandler := (ASocket.FContext as TIdServerWSContext).IOHandler; + end; + + if (ASocket.FIOHandler <> nil) then + begin + Assert(ASocket.FIOHandler.IsWebsocket); + if DebugHook <> 0 then + Windows.OutputDebugString(PChar('Send: ' + aText)); + ASocket.FIOHandler.Write(aText); + end + else if ASocket.GUID <> '' then + begin + ASocket.QueueData(aText); + Result := aText; //for xhr-polling the data must be send using responseinfo instead of direct write! + end + else //disconnected + Assert(False, 'disconnected'); + finally + ASocket.UnLock; + end; +end; + +{ TSocketIOCallbackObj } + +procedure TSocketIOCallbackObj.SendResponse(const aResponse: string); +begin + FHandling.WriteSocketIOResult(FSocket, FMsgNr, '', aResponse); +end; + +{ TSocketIOContext } + +constructor TSocketIOContext.Create(aClient: TIdHTTP); +begin + FClient := aClient; + if aClient is TIdHTTPWebsocketClient then + begin + FHandling := (aClient as TIdHTTPWebsocketClient).SocketIO; + end; + FIOHandler := (aClient as TIdHTTPWebsocketClient).IOHandler; +end; + +destructor TSocketIOContext.Destroy; +begin + FEvent.Free; + FQueue.Free; + inherited; +end; + +procedure TSocketIOContext.EmitEvent(const aEventName: string; const aData: ISuperObject; + const aCallback: TSocketIOMsgJSON); +begin + if not Assigned(aCallback) then + FHandling.WriteSocketIOEvent(Self, '', aEventName, aData.AsJSon, nil) + else + begin + FHandling.WriteSocketIOEventRef(Self, '', aEventName, aData.AsJSon, + procedure(const aData: string) + begin + aCallback(Self, SO(aData), nil); + end); + end; +end; + +function TSocketIOContext.IsDisconnected: Boolean; +begin + Result := (FClient = nil) and (FContext = nil) and (FGUID = ''); +end; + +procedure TSocketIOContext.Lock; +begin + System.TMonitor.Enter(Self); +end; + +constructor TSocketIOContext.Create; +begin + // +end; + +function TSocketIOContext.PeerIP: string; +begin + Result := FPeerIP; + if FContext is TIdServerWSContext then + Result := (FContext as TIdServerWSContext).Binding.PeerIP + else if FIOHandler <> nil then + Result := FIOHandler.Binding.PeerIP; +end; + +function TSocketIOContext.PeerPort: Integer; +begin + Result := 0; + if FContext is TIdServerWSContext then + Result := (FContext as TIdServerWSContext).Binding.PeerPort + else if FIOHandler <> nil then + Result := FIOHandler.Binding.PeerPort +end; + +procedure TSocketIOContext.QueueData(const aData: string); +begin + if FEvent = nil then + FEvent := TEvent.Create; + if FQueue = nil then + FQueue := TList.Create; + + FQueue.Add(aData); + FEvent.SetEvent; +end; + +function TSocketIOContext.ResourceName: string; +begin + if FContext is TIdServerWSContext then + Result := (FContext as TIdServerWSContext).ResourceName + else if FClient <> nil then + Result := (FClient as TIdHTTPWebsocketClient).WSResourceName +end; + +procedure TSocketIOContext.Send(const aData: string; + const aCallback: TSocketIOMsgJSON); +begin + if not Assigned(aCallback) then + FHandling.WriteSocketIOMsg(Self, '', aData) + else + begin + FHandling.WriteSocketIOMsg(Self, '', aData, + procedure(const aData: string) + begin + aCallback(Self, SO(aData), nil); + end); + end; +end; + +procedure TSocketIOContext.SendJSON(const aJSON: ISuperObject; + const aCallback: TSocketIOMsgJSON); +begin + if not Assigned(aCallback) then + FHandling.WriteSocketIOJSON(Self, '', aJSON.AsJSon()) + else + begin + FHandling.WriteSocketIOMsg(Self, '', aJSON.AsJSon(), + procedure(const aData: string) + begin + aCallback(Self, SO(aData), nil); + end); + end; +end; + +procedure TSocketIOContext.ServerContextDestroy(AContext: TIdContext); +begin + Self.Context := nil; + Self.FIOHandler := nil; +end; + +procedure TSocketIOContext.SetConnectSend(const Value: Boolean); +begin + FConnectSend := Value; +end; + +procedure TSocketIOContext.SetContext(const Value: TIdContext); +begin + if (Value <> FContext) and (Value = nil) and + (FContext is TIdServerWSContext) then + (FContext as TIdServerWSContext).OnDestroy := nil; + + FContext := Value; + + if FContext is TIdServerWSContext then + (FContext as TIdServerWSContext).OnDestroy := Self.ServerContextDestroy; +end; + +procedure TSocketIOContext.SetPingSend(const Value: Boolean); +begin + FPingSend := Value; +end; + +procedure TSocketIOContext.UnLock; +begin + System.TMonitor.Exit(Self); +end; + +function TSocketIOContext.WaitForQueue(aTimeout_ms: Integer): string; +begin + if (FEvent = nil) or (FQueue = nil) then + begin + Lock; + try + if FEvent = nil then + FEvent := TEvent.Create; + if FQueue = nil then + FQueue := TList.Create; + finally + UnLock; + end; + end; + + if (FQueue.Count > 0) or + (FEvent.WaitFor(aTimeout_ms) = wrSignaled) then + begin + Lock; + try + FEvent.ResetEvent; + if (FQueue.Count > 0) then + begin + Result := FQueue.First; + FQueue.Delete(0); + end; + finally + UnLock; + end; + end; +end; + +function TIdBaseSocketIOHandling.WriteConnect(const AContext: TIdContext): string; +var + socket: TSocketIOContext; +begin + //if not FConnections.TryGetValue(AContext, socket) then + socket := NewConnection(AContext); + Result := WriteConnect(socket); +end; + +procedure TIdBaseSocketIOHandling.WriteDisConnect(const AContext: TIdContext); +var + socket: TSocketIOContext; +begin + if not FConnections.TryGetValue(AContext, socket) then + socket := NewConnection(AContext); + WriteDisConnect(socket); +end; + +procedure TIdBaseSocketIOHandling.WritePing(const AContext: TIdContext); +var + socket: TSocketIOContext; +begin + if not FConnections.TryGetValue(AContext, socket) then + socket := NewConnection(AContext); + WritePing(socket); +end; + +{ TIdSocketIOHandling } + +procedure TIdSocketIOHandling.Emit(const aEventName: string; + const aData: ISuperObject; const aCallback: TSocketIOMsgJSON); +var + context: TSocketIOContext; + jsonarray: string; + isendcount: Integer; +begin + if aData.IsType(stArray) then + jsonarray := aData.AsString + else if aData.IsType(stString) then + jsonarray := '["' + aData.AsString + '"]' + else + jsonarray := '[' + aData.AsString + ']'; + + Lock; + try + isendcount := 0; + + //note: is single connection? + for context in FConnections.Values do + begin + if context.IsDisconnected then Continue; + + if not Assigned(aCallback) then + WriteSocketIOEvent(context, ''{no room}, aEventName, jsonarray, nil) + else + WriteSocketIOEventRef(context, ''{no room}, aEventName, jsonarray, + procedure(const aData: string) + begin + aCallback(context, SO(aData), nil); + end); + Inc(isendcount); + end; + for context in FConnectionsGUID.Values do + begin + if context.IsDisconnected then Continue; + + if not Assigned(aCallback) then + WriteSocketIOEvent(context, ''{no room}, aEventName, jsonarray, nil) + else + WriteSocketIOEventRef(context, ''{no room}, aEventName, jsonarray, + procedure(const aData: string) + begin + aCallback(context, SO(aData), nil); + end); + Inc(isendcount); + end; + + if isendcount = 0 then + raise EIdSocketIoUnhandledMessage.Create('No socket.io connections!'); + finally + UnLock; + end; +end; + +procedure TIdSocketIOHandling.Send(const aMessage: string; + const aCallback: TSocketIOMsgJSON); +var + context: TSocketIOContext; + isendcount: Integer; +begin + Lock; + try + isendcount := 0; + + //note: is single connection? + for context in FConnections.Values do + begin + if context.IsDisconnected then Continue; +// if not context.IsSocketIO then +// raise EIdSocketIoUnhandledMessage.Create('Not a socket.io connection!'); + + if not Assigned(aCallback) then + WriteSocketIOMsg(context, ''{no room}, aMessage, nil) + else + WriteSocketIOMsg(context, ''{no room}, aMessage, + procedure(const aData: string) + begin + aCallback(context, SO(aData), nil); + end); + Inc(isendcount); + end; + for context in FConnectionsGUID.Values do + begin + if context.IsDisconnected then Continue; +// if not context.IsSocketIO then +// raise EIdSocketIoUnhandledMessage.Create('Not a socket.io connection!'); + + if not Assigned(aCallback) then + WriteSocketIOMsg(context, ''{no room}, aMessage, nil) + else + WriteSocketIOMsg(context, ''{no room}, aMessage, + procedure(const aData: string) + begin + aCallback(context, SO(aData), nil); + end); + Inc(isendcount); + end; + + if isendcount = 0 then + raise EIdSocketIoUnhandledMessage.Create('No socket.io connections!'); + finally + UnLock; + end; +end; + +end. diff --git a/IdWebsocketServer.pas b/IdWebsocketServer.pas new file mode 100644 index 0000000..16236f9 --- /dev/null +++ b/IdWebsocketServer.pas @@ -0,0 +1,156 @@ +unit IdWebsocketServer; + +interface + +uses + IdServerWebsocketHandling, IdServerSocketIOHandling, IdServerWebsocketContext, + IdHTTPServer, IdContext, IdCustomHTTPServer, Classes, IdIOHandlerWebsocket; + +type + TWebsocketMessageText = procedure(const AContext: TIdServerWSContext; const aText: string) of object; + TWebsocketMessageBin = procedure(const AContext: TIdServerWSContext; const aData: TStream) of object; + + TIdWebsocketServer = class(TIdHTTPServer) + private + FSocketIO: TIdServerSocketIOHandling_Ext; + FOnMessageText: TWebsocketMessageText; + FOnMessageBin: TWebsocketMessageBin; + function GetSocketIO: TIdServerSocketIOHandling; + protected + procedure DoCommandGet(AContext: TIdContext; ARequestInfo: TIdHTTPRequestInfo; + AResponseInfo: TIdHTTPResponseInfo); override; + procedure ContextCreated(AContext: TIdContext); override; + procedure ContextDisconnected(AContext: TIdContext); override; + + procedure WebsocketChannelRequest(const AContext: TIdServerWSContext; aType: TWSDataType; const aStrmRequest, aStrmResponse: TMemoryStream); + public + procedure AfterConstruction; override; + destructor Destroy; override; + + procedure SendMessageToAll(const aBinStream: TStream);overload; + procedure SendMessageToAll(const aText: string);overload; + + property OnMessageText: TWebsocketMessageText read FOnMessageText write FOnMessageText; + property OnMessageBin : TWebsocketMessageBin read FOnMessageBin write FOnMessageBin; + + property SocketIO: TIdServerSocketIOHandling read GetSocketIO; + end; + +implementation + +uses + IdServerIOHandlerWebsocket, IdStreamVCL, IdGlobal, Windows; + +{ TIdWebsocketServer } + +procedure TIdWebsocketServer.AfterConstruction; +begin + inherited; + + FSocketIO := TIdServerSocketIOHandling_Ext.Create; + + ContextClass := TIdServerWSContext; + if IOHandler = nil then + IOHandler := TIdServerIOHandlerWebsocket.Create(Self); +end; + +procedure TIdWebsocketServer.ContextCreated(AContext: TIdContext); +begin + inherited ContextCreated(AContext); + (AContext as TIdServerWSContext).OnCustomChannelExecute := Self.WebsocketChannelRequest; +end; + +procedure TIdWebsocketServer.ContextDisconnected(AContext: TIdContext); +begin + FSocketIO.FreeConnection(AContext); + inherited; +end; + +destructor TIdWebsocketServer.Destroy; +begin + inherited; + FSocketIO.Free; +end; + +procedure TIdWebsocketServer.DoCommandGet(AContext: TIdContext; + ARequestInfo: TIdHTTPRequestInfo; AResponseInfo: TIdHTTPResponseInfo); +begin + (AContext as TIdServerWSContext).OnCustomChannelExecute := Self.WebsocketChannelRequest; + (AContext as TIdServerWSContext).SocketIO := FSocketIO; + + if not TIdServerWebsocketHandling.ProcessServerCommandGet(AContext as TIdServerWSContext, ARequestInfo, AResponseInfo) then + inherited DoCommandGet(AContext, ARequestInfo, AResponseInfo); +end; + +function TIdWebsocketServer.GetSocketIO: TIdServerSocketIOHandling; +begin + Result := FSocketIO; +end; + +procedure TIdWebsocketServer.SendMessageToAll(const aText: string); +var + l: TList; + ctx: TIdServerWSContext; + i: Integer; +begin + l := Self.Contexts.LockList; + try + for i := 0 to l.Count - 1 do + begin + ctx := TIdServerWSContext(l.Items[i]); + Assert(ctx is TIdServerWSContext); + if ctx.IOHandler.IsWebsocket and + not ctx.IsSocketIO + then + ctx.IOHandler.Write(aText); + end; + finally + Self.Contexts.UnlockList; + end; +end; + +procedure TIdWebsocketServer.WebsocketChannelRequest( + const AContext: TIdServerWSContext; aType: TWSDataType; const aStrmRequest, + aStrmResponse: TMemoryStream); +var s: string; +begin + if aType = wdtText then + begin + with TStreamReader.Create(aStrmRequest) do + begin + s := ReadToEnd; + Free; + end; + if Assigned(OnMessageText) then + OnMessageText(AContext, s) + end + else if Assigned(OnMessageBin) then + OnMessageBin(AContext, aStrmRequest) +end; + +procedure TIdWebsocketServer.SendMessageToAll(const aBinStream: TStream); +var + l: TList; + ctx: TIdServerWSContext; + i: Integer; + bytes: TIdBytes; +begin + l := Self.Contexts.LockList; + try + TIdStreamHelperVCL.ReadBytes(aBinStream, bytes); + + for i := 0 to l.Count - 1 do + begin + ctx := TIdServerWSContext(l.Items[i]); + Assert(ctx is TIdServerWSContext); + if ctx.IOHandler.IsWebsocket and + not ctx.IsSocketIO + then + ctx.IOHandler.Write(bytes); + end; + finally + Self.Contexts.UnlockList; + end; +end; + +end.