[WIP] Proper handling for connecting shards

fixed client deserialization in channel create event
adjust shard count in nyxx and client options,
[ci skip]
This commit is contained in:
Szymon Uglis 2020-06-02 00:11:02 +02:00
parent d745c5e65f
commit a628903f43
7 changed files with 102 additions and 91 deletions

View file

@ -7,11 +7,8 @@ class ClientOptions {
/// **It means client won't send any of these. It doesn't mean filtering guild messages.**
AllowedMentions? allowedMentions;
/// The index of this shard
int shardIndex;
/// The total number of shards.
int shardCount;
int? shardCount;
/// The number of messages to cache for each channel.
int messageCacheSize;
@ -39,8 +36,7 @@ class ClientOptions {
/// Makes a new `ClientOptions` object.
ClientOptions(
{this.allowedMentions,
this.shardIndex = 0,
this.shardCount = 1,
this.shardCount,
this.messageCacheSize = 400,
this.forceFetchMembers = false,
this.cacheMembers = true,

View file

@ -331,7 +331,7 @@ class Nyxx implements Disposable {
}
/// Returns number of shards
int get shards => this._options.shardCount;
int get shards => this.shardManager._shards.length;
/*
/// Sets presence for bot.

View file

@ -2,72 +2,82 @@ part of nyxx;
class ShardManager implements Disposable {
/// Emitted when the shard is ready.
late Stream<ShardNew> onConnected = this._onConnect.stream;
late Stream<Shard> onConnected = this._onConnect.stream;
/// Emitted when the shard encounters an error.
late Stream<ShardNew> onDisconnect = this._onDisconnect.stream;
late Stream<Shard> onDisconnect = this._onDisconnect.stream;
/// Emitted when shard receives member chunk.
late Stream<MemberChunkEvent> onMemberChunk = this._onMemberChunk.stream;
final StreamController<ShardNew> _onConnect = StreamController<ShardNew>.broadcast();
final StreamController<ShardNew> _onDisconnect = StreamController<ShardNew>.broadcast();
final StreamController<Shard> _onConnect = StreamController<Shard>.broadcast();
final StreamController<Shard> _onDisconnect = StreamController<Shard>.broadcast();
final StreamController<MemberChunkEvent> _onMemberChunk = StreamController.broadcast();
final Logger _logger = Logger("Shard Manager");
final _WS _ws;
final int _numShards;
final Map<int, ShardNew> _shards = {};
final Map<int, Shard> _shards = {};
/// Starts shard manager
ShardManager(this._ws, this._numShards) {
for(final shardId in Iterable.generate(_numShards, (i) => i)) {
final shard = ShardNew(shardId, this, _ws.gateway);
_shards[shardId] = shard;
_connect(_numShards - 1);
}
void _connect(int shardId) {
if(shardId < 0) {
return;
}
final shard = Shard(shardId, this, _ws.gateway);
_shards[shardId] = shard;
Future.delayed(const Duration(seconds: 1), () => _connect(shardId - 1));
}
@override
Future<void> dispose() {
// TODO: implement dispose
throw UnimplementedError();
Future<void> dispose() async {
for(final shard in this._shards.values) {
await shard.dispose();
}
await this._onConnect.close();
await this._onDisconnect.close();
await this._onMemberChunk.close();
}
}
class ShardNew implements Disposable {
class Shard implements Disposable {
/// Id of shard
final int id;
late final Isolate shardIsolate;
late final ReceivePort receivePort;
late final SendPort isolateSendPort;
late final Stream<dynamic> receiveStream;
/// Reference to [ShardManager]
ShardManager manager;
late final Isolate _shardIsolate; // Reference to isolate
late final Stream<dynamic> _receiveStream; // Broadcast stream on which data from isolate is received
late final ReceivePort _receivePort; // Port on which data from isolate is received
late final SendPort _isolateSendPort; // Port on which data can be sent to isolate
String? _sessionId;
int _sequence = 0;
bool _acked = false;
late Timer _heartbeatTimer;
bool connected = false;
late SendPort sendPort;
/// Isolate
ShardNew(this.id, this.manager, String gatewayUrl) {
this.receivePort = ReceivePort();
this.receiveStream = receivePort.asBroadcastStream();
this.isolateSendPort = receivePort.sendPort;
Shard(this.id, this.manager, String gatewayUrl) {
this._receivePort = ReceivePort();
this._receiveStream = _receivePort.asBroadcastStream();
this._isolateSendPort = _receivePort.sendPort;
Isolate.spawn(_shardHandler, isolateSendPort, errorsAreFatal: false).then((value) async {
this.shardIsolate = value;
this.sendPort = await receiveStream.first as SendPort;
Isolate.spawn(_shardHandler, _isolateSendPort, errorsAreFatal: false).then((value) async {
this._shardIsolate = value;
this.sendPort = await _receiveStream.first as SendPort;
this.sendPort.send({"cmd" : "INIT", "gatewayUrl" : gatewayUrl });
this.receiveStream.listen(_handle);
this._receiveStream.listen(_handle);
});
}
@ -77,48 +87,47 @@ class ShardNew implements Disposable {
}
void _heartbeat() {
if (!this._acked) manager._logger.warning("No ACK received");
this.send(OPCodes.heartbeat, _sequence);
this._acked = false;
}
Future<void> _handle(dynamic data) async {
if(data["cmd"] == "ERROR") {
manager._logger.shout("Error on shard $id. Error message: ${data["error"]}");
return;
}
final msg = data["jsonData"] as Map<String, dynamic>;
final resume = data["resume"] as bool;
print("got deserilized data on shard ${this.id}: ${jsonEncode(data)}");
if (msg["op"] == OPCodes.dispatch && manager._ws._client._options.ignoredEvents.contains(msg["t"] as String)) {
return;
}
if (msg["s"] != null) this._sequence = msg["s"] as int;
if (msg["s"] != null) {
this._sequence = msg["s"] as int;
}
switch (msg["op"] as int) {
case OPCodes.heartbeatAck:
this._acked = true;
break;
case OPCodes.hello:
if (this._sessionId == null || !resume) {
final identifyMsg = <String, dynamic>{
"token": manager._ws._client._token,
"properties": <String, dynamic>{
"properties": <String, dynamic> {
"\$os": Platform.operatingSystem,
"\$browser": "nyxx",
"\$device": "nyxx",
},
"large_threshold": manager._ws._client._options.largeThreshold
//"compress": "zlib-stream"
"large_threshold": manager._ws._client._options.largeThreshold,
"compress": false
};
if (manager._ws._client._options.gatewayIntents != null) {
identifyMsg["intents"] = manager._ws._client._options.gatewayIntents!._calculate();
}
//identifyMsg["shard"] = <int>[this.id, manager._ws._client._options.shardCount];
identifyMsg["shard"] = <int>[this.id, 2];asdf
print("Shard config: ${jsonEncode(identifyMsg["shard"])}");
identifyMsg["shard"] = <int>[this.id, manager._numShards];
this.send(OPCodes.identify, identifyMsg);
} else if (resume) {
@ -132,15 +141,14 @@ class ShardNew implements Disposable {
break;
case OPCodes.invalidSession:
manager._logger.severe("Invalid session. Reconnecting...");
manager._logger.severe("Invalid session on shard $id. ${(msg["d"] as bool) ? "Resuming..." : "Reconnecting..."}");
_heartbeatTimer.cancel();
//manager._ws._client._events.onDisconnect.add(DisconnectEvent._new(this, 9));
//this._onDisconnect.add(this);
manager._ws._client._events.onDisconnect.add(DisconnectEvent._new(this, DisconnectEventReason.invalidSession));
if (msg["d"] as bool) {
Future.delayed(const Duration(seconds: 3), () => this.sendPort.send({ "cmd" : "RECONNECT"}));
Future.delayed(const Duration(seconds: 1), () => this.sendPort.send({ "cmd" : "RECONNECT"}));
} else {
Future.delayed(const Duration(seconds: 6), () => this.sendPort.send({ "cmd" : "CONNECT"}));
Future.delayed(const Duration(seconds: 2), () => this.sendPort.send({ "cmd" : "CONNECT"}));
}
break;
@ -155,7 +163,6 @@ class ShardNew implements Disposable {
this.connected = true;
manager._logger.info("Shard ${this.id} connected");
//this._onConnect.add(this);
if (!resume) {
await manager._ws.propagateReady();
@ -300,15 +307,19 @@ class ShardNew implements Disposable {
}
@override
Future<void> dispose() {
// TODO: implement dispose
throw UnimplementedError();
Future<void> dispose() async {
this._isolateSendPort.send({"cmd" : "TERMINATE" });
await this._receiveStream.firstWhere((element) => (element as Map<String, dynamic>)["cmd"] == "TERMINATE_OK");
this._shardIsolate.kill();
}
}
// Decodes zlib compresses string into string json
Map<String, dynamic> _decodeBytes(dynamic bytes) {
if (bytes is String) return jsonDecode(bytes) as Map<String, dynamic>;
if (bytes is String) {
return jsonDecode(bytes) as Map<String, dynamic>;
}
final decoded = zlib.decoder.convert(bytes as List<int>);
final rawStr = utf8.decode(decoded);
@ -351,19 +362,15 @@ Future<void> _shardHandler(SendPort shardPort) async {
await transport.WebSocket.connect(Uri.parse("$gatewayUri?v=6&encoding=json")).then((ws) {
_socket = ws;
_socket!.listen((data) {
print("got data");
shardPort.send({ "cmd" : "DATA", "jsonData" : _decodeBytes(data), "resume" : resume});
},
onDone: () => print("Shard done. ${_socket!.closeCode}"), onError: (err) => print(err));
}, onError: (_, __) => Future.delayed(const Duration(seconds: 6), _connect));
}, onError: (err) => shardPort.send({ "cmd" : "ERROR", "error": err.toString()}));
}, onError: (_, __) => Future.delayed(const Duration(seconds: 2), _connect));
}
// Connects
await _connect(false, true);
await for(final message in receiveStream) {
print("got data to send");
final cmd = message["cmd"];
if(cmd == "SEND") {
@ -379,5 +386,9 @@ Future<void> _shardHandler(SendPort shardPort) async {
await _socket?.close(1000);
_connect(false, true);
}
if(cmd == "TERMINATE") {
await _socket?.close(1000);
}
}
}

View file

@ -40,7 +40,7 @@ class Channel extends SnowflakeEntity {
return CachelessVoiceChannel._new(raw, Snowflake(raw["guild_id"]), client);
}
return CacheVoiceChannel._new(raw, channelGuild ,client);
return CacheVoiceChannel._new(raw, channelGuild, client);
break;
case 4:
return CategoryChannel._new(raw, channelGuild == null ? Snowflake(raw["guild_id"]) : channelGuild.id, client);

View file

@ -7,7 +7,7 @@ class ChannelCreateEvent {
late final Channel channel;
ChannelCreateEvent._new(Map<String, dynamic> raw, Nyxx client) {
this.channel = Channel._deserialize(raw, client);
this.channel = Channel._deserialize(raw["d"] as Map<String, dynamic>, client);
client.channels[channel.id] = channel;
if (this.channel is CacheGuildChannel) {

View file

@ -3,10 +3,16 @@ part of nyxx;
/// Sent when a shard disconnects from the websocket.
class DisconnectEvent {
/// The shard that got disconnected.
dynamic shard;
Shard shard;
/// The close code.
int closeCode;
/// Reason of disconnection
DisconnectEventReason reason;
DisconnectEvent._new(this.shard, this.closeCode);
DisconnectEvent._new(this.shard, this.reason);
}
class DisconnectEventReason extends IEnum<int> {
static const DisconnectEventReason invalidSession = const DisconnectEventReason._from(9);
const DisconnectEventReason._from(int value) : super(value);
}

View file

@ -9,57 +9,55 @@ class _WS {
late int remaining;
late DateTime resetAt;
late int recommendedShardsNum;
final Logger logger = Logger("Client");
final Logger _logger = Logger("Client");
/// Makes a new WS manager.
_WS(this._client) {
_client._http._execute(BasicRequest._new("/gateway/bot")).then((httpResponse) {
if (httpResponse is HttpResponseError) {
this.logger.severe("Cannot get gateway url");
this._logger.severe("Cannot get gateway url");
exit(1);
}
final response = httpResponse as HttpResponseSuccess;
this.gateway = response.jsonBody["url"] as String;
this.remaining = response.jsonBody["session_start_limit"]["remaining"] as int;
this.resetAt =
DateTime.now().add(Duration(milliseconds: response.jsonBody["session_start_limit"]["reset_after"] as int));
logger.info("Remaining ${this.remaining} connections starts. Limit will reset at ${this.resetAt}");
this.resetAt = DateTime.now().add(Duration(milliseconds: response.jsonBody["session_start_limit"]["reset_after"] as int));
this.recommendedShardsNum = response.jsonBody["shards"] as int;
checkForConnections();
setupShard(_client._options.shardIndex);
this.connectShard(0);
this._client.shardManager = ShardManager(this, this._client._options.shardCount != null ? this._client._options.shardCount! : this.recommendedShardsNum);
});
}
void checkForConnections() {
if (this.remaining < 50) logger.warning("50 connection starts left.");
_logger.info("Remaining ${this.remaining} connections starts. Limit will reset at ${this.resetAt}");
if (this.remaining < 50) {
_logger.warning("50 connection starts left.");
}
if (this.remaining < 10) {
logger.severe("Exiting to prevent API abuse. 10 connections starts left.");
_logger.severe("Exiting to prevent API abuse. 10 connections starts left.");
exit(1);
}
}
void setupShard(int shardId) {
this._client.shardManager = ShardManager(this, 2);
}
void connectShard(int index) {
//_client.shard._connect(false, true);
}
Future<void> propagateReady() async {
if(_client.ready) {
return;
}
_client.ready = true;
final httpResponse = await _client._http._execute(BasicRequest._new("/oauth2/applications/@me"));
if (httpResponse is HttpResponseError) {
this.logger.severe("Cannot get bot identity");
this._logger.severe("Cannot get bot identity");
exit(1);
}
@ -68,6 +66,6 @@ class _WS {
_client.app = ClientOAuth2Application._new(response.jsonBody as Map<String, dynamic>, _client);
_client._events.onReady.add(ReadyEvent._new(_client));
logger.info("Connected and ready! Logged as `${_client.self.tag}`");
_logger.info("Connected and ready! Logged as `${_client.self.tag}`");
}
}