nst-0x05/nst-0x05/src/imserver/Server.java
2021-02-06 12:40:14 +01:00

243 lines
7.9 KiB
Java

package imserver;
import java.io.*;
import java.net.*;
import java.util.concurrent.*;
import java.util.*;
import java.util.regex.Pattern;
import java.util.regex.Matcher;
class ActiveHandlers {
private static final long serialVersionUID = 1L;
private HashSet<SocketHandler> activeHandlersSet=new HashSet<SocketHandler>();
synchronized void sendMessageToAll(SocketHandler sender, String message) {
for (SocketHandler handler:activeHandlersSet) {
if (handler == sender && message.startsWith("INFO> ", 0)) {
if (!handler.messages.offer(message)) {
System.err.printf("Client %s message queue is full, dropping the message!\n", handler.clientID);
}
}
else if (handler!=sender && !message.startsWith("ERROR> @", 0)) {
if (!handler.messages.offer(message)) {
System.err.printf("Client %s message queue is full, dropping the message!\n", handler.clientID);
}
}
}
}
synchronized void senddm(SocketHandler sender, String message) {
String[] whofor_split = message.split(" ");
String whofor = whofor_split[3].replaceFirst("@","");
System.err.println("dbgpls");
System.err.println(whofor);
for (SocketHandler handler:activeHandlersSet) {
if (handler.clientID.equals(whofor)) {
if (!handler.messages.offer(message)) {
System.err.printf("Client %s message queue is full, dropping the message!\n", handler.clientID);
}
}
}
}
synchronized boolean add(SocketHandler handler) {
return activeHandlersSet.add(handler);
}
synchronized boolean remove(SocketHandler handler) {
return activeHandlersSet.remove(handler);
}
}
class SocketHandler {
Socket mySocket;
String clientID;
ActiveHandlers activeHandlers;
ArrayBlockingQueue<String> messages=new ArrayBlockingQueue<String>(20);
CountDownLatch startSignal=new CountDownLatch(2);
OutputHandler outputHandler=new OutputHandler();
InputHandler inputHandler=new InputHandler();
volatile boolean inputFinished=false;
String username = "";
public SocketHandler(Socket mySocket, ActiveHandlers activeHandlers) {
this.mySocket=mySocket;
clientID=mySocket.getInetAddress().toString()+":"+mySocket.getPort();
this.activeHandlers=activeHandlers;
}
public String commandzpls(String message) {
if (message.startsWith("#set-username ", 0)) {
if (message.length() > 14) {
String tobeusername = message.substring(14);
Pattern patt = Pattern.compile("[\\s+]");
Matcher mat = patt.matcher(tobeusername);
if (mat.find()) {
String errmsg = "ERROR> @" + clientID + " whitespace is not allowed in usernames, thus " + tobeusername + "won't do";
System.err.println(errmsg);
return errmsg;
} else {
String old_clid = clientID;
clientID = tobeusername;
String infomsg = "INFO> " + old_clid + " is from now on known as " + clientID;
System.err.println(infomsg);
return infomsg;
}
} else {
String errmsg = "ERROR> @" + clientID + " Username too short";
System.err.println(errmsg);
return errmsg;
}
} else if (message.startsWith("#reset-username", 0)) {
String old_clid = clientID;
clientID=mySocket.getInetAddress().toString()+":"+mySocket.getPort();
String infomsg = "INFO> The username for " + old_clid + " has been reset to " + clientID;
return infomsg;
} else if (message.startsWith("#dm ", 0)) {
//String tobeusername = message.substring(4);
String[] tbuname = message.split(" ", 3);
if (tbuname.length < 2) {
String errmsg = "ERROR> @" + clientID + " you have to provide a clientID of the addressee";
System.err.println(errmsg);
return errmsg;
} else if (clientID.equals(tbuname[1])) {
String errmsg = "ERROR> @" + clientID + " cannot send DMs to yourself..";
System.err.println(errmsg);
return errmsg;
} else {
String msg = "|DM| @" + clientID + " to @" + tbuname[1] + " > " + tbuname[2];
return msg;
}
}
return "From client " + clientID + ": " + message;
}
class OutputHandler implements Runnable {
public void run() {
OutputStreamWriter writer;
try {
System.err.println("DBG>Output handler starting for "+clientID);
startSignal.countDown(); startSignal.await();
System.err.println("DBG>Output handler running for "+clientID);
writer = new OutputStreamWriter(mySocket.getOutputStream(), "UTF-8");
writer.write("\nYou are connected from " + clientID+"\n");
writer.flush();
while (!inputFinished) {
String m=messages.take();
writer.write(m+"\r\n");
writer.flush();
System.err.println("DBG>Message sent to "+clientID+":"+m+"\n");
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.err.println("DBG>Output handler for "+clientID+" has finished.");
}
}
class InputHandler implements Runnable {
public void run() {
try {
System.err.println("DBG>Input handler starting for "+clientID);
startSignal.countDown(); startSignal.await();
System.err.println("DBG>Input handler running for "+clientID);
String request="";
activeHandlers.add(SocketHandler.this);
BufferedReader reader=new BufferedReader(new InputStreamReader(mySocket.getInputStream(),"UTF-8"));
while ((request=reader.readLine())!=null) {
request = commandzpls(request);
System.out.println(request);
String[] req_kind = request.split(" ");
switch (req_kind[0]){
case "|DM|":
activeHandlers.senddm(SocketHandler.this,request);
break;
default:
activeHandlers.sendMessageToAll(SocketHandler.this,request);
break;
}
}
inputFinished=true;
messages.offer("OutputHandler, wakeup and die!");
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
synchronized (activeHandlers) {
activeHandlers.remove(SocketHandler.this);
}
}
System.err.println("DBG>Input handler for "+clientID+" has finished.");
}
}
}
public class Server {
public static void main(String[] args) {
int port=1982, max_conn=4;
if (args.length>0) {
if (args[0].startsWith("--help")) {
System.out.printf("Usage: Server [PORT] [MAX_CONNECTIONS]\n" +
"If PORT is not specified, default port %d is used\n" +
"If MAX_CONNECTIONS is not specified, default number=%d is used",port, max_conn);
return;
}
try {
port=Integer.decode(args[0]);
} catch (NumberFormatException e) {
System.err.printf("Argument %s is not integer, using default value",args[0],port);
}
if (args.length>1) try {
max_conn=Integer.decode(args[1]);
} catch (NumberFormatException e) {
System.err.printf("Argument %s is not integer, using default value",args[1],max_conn);
}
}
System.out.printf("IM server listening on port %d, maximum nr. of connections=%d...\n", port, max_conn);
ExecutorService pool=Executors.newFixedThreadPool(2*max_conn);
ActiveHandlers activeHandlers=new ActiveHandlers();
try {
ServerSocket sSocket=new ServerSocket(port);
do {
Socket clientSocket=sSocket.accept();
clientSocket.setKeepAlive(true);
SocketHandler handler=new SocketHandler(clientSocket, activeHandlers);
pool.execute(handler.inputHandler);
pool.execute(handler.outputHandler);
} while (!pool.isTerminated());
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
pool.shutdown();
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
}
}