portingtools/src/main.zig

280 lines
8.7 KiB
Zig

const std = @import("std");
const csv_reader = @import("csv_reader.zig");
const State = @import("State.zig");
const StringPacket = @import("StringPacket.zig");
pub const std_options = std.Options{
.log_level = .debug,
};
fn getAddr(alloc: std.mem.Allocator) !std.net.Address {
const sockpath = try std.fs.path.join(alloc, &.{
std.posix.getenv("XDG_RUNTIME_DIR") orelse return error.MissingRuntimeDir,
"portingtools.sock",
});
defer alloc.free(sockpath);
return try std.net.Address.initUnix(sockpath);
}
pub fn main() !void {
if (std.os.argv.len < 2) {
return error.InvalidArgs;
}
const cmd = std.mem.span(std.os.argv[1]);
if (std.mem.eql(u8, cmd, "serve")) {
try runServer();
} else if (std.mem.eql(u8, cmd, "map")) {
try map();
} else {
return error.InvalidCommand;
}
}
fn map() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const alloc = gpa.allocator();
const stdin = std.io.getStdIn().reader();
const addr = try getAddr(alloc);
const sockfd = try std.posix.socket(
std.posix.AF.UNIX,
std.posix.SOCK.STREAM | std.posix.SOCK.CLOEXEC,
0,
);
defer std.posix.close(sockfd);
try std.posix.connect(sockfd, &addr.any, addr.getOsSockLen());
const stream = std.net.Stream{ .handle = sockfd };
var buf: [128]u8 = undefined;
const request = (try stdin.readUntilDelimiterOrEof(&buf, '\n')) orelse return error.NoInput;
const pkt = StringPacket{ .str = std.mem.trim(u8, request, &std.ascii.whitespace) };
try pkt.write(stream.writer());
const res = try StringPacket.read(stream.reader(), alloc);
defer alloc.free(res.str);
try std.io.getStdOut().writer().print("{s}\n", .{res.str});
}
fn loadData(state: *State) !void {
var data = State.MapData{
.arena = std.heap.ArenaAllocator.init(state.alloc),
.mappings = std.StringHashMap(State.Mapping).init(state.alloc),
.renames = std.StringHashMap([]const u8).init(state.alloc),
};
errdefer {
data.arena.deinit();
data.mappings.deinit();
data.renames.deinit();
}
if (std.fs.cwd().openFile("renames.csv", .{})) |renames_file| {
defer renames_file.close();
var reader = csv_reader.csvReader(std.io.bufferedReader(renames_file.reader()));
while (try reader.next(data.arena.allocator())) |rec| {
if (rec.cols.len != 2) {
std.log.warn("found rename with invalid record length, skipping", .{});
continue;
}
if (data.renames.contains(rec.cols[0])) {
std.log.warn("duplicate rename '{s}'", .{rec.cols[0]});
continue;
}
try data.renames.put(rec.cols[0], rec.cols[1]);
}
std.log.info("loaded {} renames", .{data.renames.count()});
} else |err| {
std.log.warn("couldn't open renames file: {}, skipping", .{err});
}
var mappings_dir = try std.fs.cwd().openDir("mappings", .{ .iterate = true });
defer mappings_dir.close();
var mappings_iter = mappings_dir.iterate();
while (try mappings_iter.next()) |entry| {
const fpath = try std.fs.path.join(state.alloc, &.{ "mappings", entry.name });
defer state.alloc.free(fpath);
var mapfile = try std.fs.cwd().openFile(fpath, .{});
defer mapfile.close();
var reader = csv_reader.csvReader(std.io.bufferedReader(mapfile.reader()));
while (try reader.next(data.arena.allocator())) |rec| {
if (rec.cols.len < 2) {
std.log.warn("found mapping with invalid length, skipping", .{});
continue;
}
if (data.mappings.contains(rec.cols[0])) {
std.log.warn("duplicate mapping '{s}'", .{rec.cols[0]});
continue;
}
const mapping = State.Mapping{
.mapped = rec.cols[1],
.doc = if (rec.cols.len >= 4) rec.cols[3] else null,
};
try data.mappings.put(rec.cols[0], mapping);
}
}
state.lock.lock();
defer state.lock.unlock();
state.mdata = data;
std.log.info("loaded {} mappings", .{data.mappings.count()});
}
fn handleConnection(state: *State, con: std.net.Server.Connection) !void {
while (true) {
const req = try StringPacket.read(con.stream.reader(), state.alloc);
defer state.alloc.free(req.str);
state.lock.lockShared();
defer state.lock.unlockShared();
if (state.mdata) |m| {
if (m.mappings.get(req.str)) |mapping| {
const renamed = m.renames.get(mapping.mapped);
const res = StringPacket{ .str = renamed orelse mapping.mapped };
try res.write(con.stream.writer());
std.log.info(
\\
\\ Unmapped: {s}
\\ Mapped: {s}
\\ Renamed: {s}
\\
\\ Doc: {s}
, .{
req.str,
mapping.mapped,
renamed orelse "<no rename>",
mapping.doc orelse "<no docs>",
});
} else {
const res = StringPacket{ .str = "<unknown>" };
try res.write(con.stream.writer());
}
} else {
const res = StringPacket{ .str = "<not loaded>" };
try res.write(con.stream.writer());
}
}
}
fn runServer() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const alloc = gpa.allocator();
var state = State{
.alloc = alloc,
.lock = .{},
};
defer state.deinit();
const sigs = comptime sigset: {
var sigset = std.os.linux.empty_sigset;
std.os.linux.sigaddset(&sigset, std.posix.SIG.TERM);
std.os.linux.sigaddset(&sigset, std.posix.SIG.INT);
break :sigset sigset;
};
std.posix.sigprocmask(std.posix.SIG.BLOCK, &sigs, null);
var pool: std.Thread.Pool = undefined;
try std.Thread.Pool.init(&pool, .{
.allocator = alloc,
.n_jobs = 4, // plenty
});
defer pool.deinit(); // joins threads
try pool.spawn((struct {
fn f(s: *State) void {
loadData(s) catch |e| {
std.log.err("failed to load data: {}", .{e});
s.mdata = .{
.arena = std.heap.ArenaAllocator.init(s.alloc),
.mappings = std.StringHashMap(State.Mapping).init(s.alloc),
.renames = std.StringHashMap([]const u8).init(s.alloc),
};
};
}
}).f, .{&state});
const addr = try getAddr(alloc);
var server = try addr.listen(.{});
defer {
server.deinit();
const path = std.mem.sliceTo(&addr.un.path, 0);
std.fs.cwd().deleteFile(path) catch |e|
std.log.warn("failed to delete socket: {}", .{e});
}
std.log.info("listening on {}", .{addr});
const sigfd = try std.posix.signalfd(-1, &sigs, 0);
defer std.posix.close(sigfd);
const epfd = try std.posix.epoll_create1(0);
defer std.posix.close(epfd);
for ([_]std.posix.fd_t{ server.stream.handle, sigfd }) |fd| {
const ev = std.os.linux.epoll_event{
.data = .{ .fd = fd },
.events = std.os.linux.EPOLL.IN,
};
try std.posix.epoll_ctl(epfd, std.os.linux.EPOLL.CTL_ADD, fd, @constCast(&ev));
}
var ev_buf: [32]std.os.linux.epoll_event = undefined;
outer: while (true) {
const evs = ev_buf[0..std.os.linux.epoll_wait(epfd, &ev_buf, ev_buf.len, -1)];
for (evs) |ev| {
if (ev.data.fd == server.stream.handle) {
const con = try server.accept();
errdefer con.stream.close();
try pool.spawn((struct {
fn f(s: *State, conn: std.net.Server.Connection) void {
defer conn.stream.close();
handleConnection(s, conn) catch |e| switch (e) {
error.EndOfStream => {},
else => std.log.warn("handing connection: {}", .{e}),
};
}
}).f, .{ &state, con });
} else if (ev.data.fd == sigfd) {
var siginfo: std.posix.siginfo_t = undefined;
std.debug.assert(try std.posix.read(
sigfd,
std.mem.asBytes(&siginfo),
) == @sizeOf(std.posix.siginfo_t));
std.log.info("got signal {}, bye!", .{siginfo.signo});
break :outer;
}
}
}
}