|
|
|
@ -0,0 +1,166 @@
|
|
|
|
|
package imserver;
|
|
|
|
|
import java.io.*;
|
|
|
|
|
import java.net.*;
|
|
|
|
|
import java.util.concurrent.*;
|
|
|
|
|
import java.util.*;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ActiveHandlers {
|
|
|
|
|
private static final long serialVersionUID = 1L;
|
|
|
|
|
private HashSet<SocketHandler> activeHandlersSet=new HashSet<SocketHandler>();
|
|
|
|
|
|
|
|
|
|
synchronized void sendMessageToAll(SocketHandler sender, String message) {
|
|
|
|
|
for (SocketHandler handler:activeHandlersSet)
|
|
|
|
|
if (handler!=sender) {
|
|
|
|
|
if (!handler.messages.offer(message))
|
|
|
|
|
System.err.printf("Client %s message queue is full, dropping the message!\n", handler.clientID);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
synchronized boolean add(SocketHandler handler) {
|
|
|
|
|
return activeHandlersSet.add(handler);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
synchronized boolean remove(SocketHandler handler) {
|
|
|
|
|
return activeHandlersSet.remove(handler);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SocketHandler {
|
|
|
|
|
Socket mySocket;
|
|
|
|
|
String clientID;
|
|
|
|
|
ActiveHandlers activeHandlers;
|
|
|
|
|
|
|
|
|
|
ArrayBlockingQueue<String> messages=new ArrayBlockingQueue<String>(20);
|
|
|
|
|
|
|
|
|
|
CountDownLatch startSignal=new CountDownLatch(2);
|
|
|
|
|
|
|
|
|
|
OutputHandler outputHandler=new OutputHandler();
|
|
|
|
|
InputHandler inputHandler=new InputHandler();
|
|
|
|
|
volatile boolean inputFinished=false;
|
|
|
|
|
|
|
|
|
|
public SocketHandler(Socket mySocket, ActiveHandlers activeHandlers) {
|
|
|
|
|
this.mySocket=mySocket;
|
|
|
|
|
clientID=mySocket.getInetAddress().toString()+":"+mySocket.getPort();
|
|
|
|
|
this.activeHandlers=activeHandlers;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class OutputHandler implements Runnable {
|
|
|
|
|
public void run() {
|
|
|
|
|
OutputStreamWriter writer;
|
|
|
|
|
try {
|
|
|
|
|
System.err.println("DBG>Output handler starting for "+clientID);
|
|
|
|
|
startSignal.countDown(); startSignal.await();
|
|
|
|
|
System.err.println("DBG>Output handler running for "+clientID);
|
|
|
|
|
writer = new OutputStreamWriter(mySocket.getOutputStream(), "UTF-8");
|
|
|
|
|
writer.write("\nYou are connected from " + clientID+"\n");
|
|
|
|
|
writer.flush();
|
|
|
|
|
while (!inputFinished) {
|
|
|
|
|
String m=messages.take();
|
|
|
|
|
writer.write(m+"\r\n");
|
|
|
|
|
writer.flush();
|
|
|
|
|
System.err.println("DBG>Message sent to "+clientID+":"+m+"\n");
|
|
|
|
|
}
|
|
|
|
|
} catch (IOException e) {
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
}
|
|
|
|
|
System.err.println("DBG>Output handler for "+clientID+" has finished.");
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class InputHandler implements Runnable {
|
|
|
|
|
public void run() {
|
|
|
|
|
try {
|
|
|
|
|
System.err.println("DBG>Input handler starting for "+clientID);
|
|
|
|
|
startSignal.countDown(); startSignal.await();
|
|
|
|
|
System.err.println("DBG>Input handler running for "+clientID);
|
|
|
|
|
String request="";
|
|
|
|
|
|
|
|
|
|
activeHandlers.add(SocketHandler.this);
|
|
|
|
|
BufferedReader reader=new BufferedReader(new InputStreamReader(mySocket.getInputStream(),"UTF-8"));
|
|
|
|
|
while ((request=reader.readLine())!=null) {
|
|
|
|
|
request="From client "+clientID+": "+request;
|
|
|
|
|
System.out.println(request);
|
|
|
|
|
activeHandlers.sendMessageToAll(SocketHandler.this,request);
|
|
|
|
|
}
|
|
|
|
|
inputFinished=true;
|
|
|
|
|
messages.offer("OutputHandler, wakeup and die!");
|
|
|
|
|
} catch (UnknownHostException e) {
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
} catch (IOException e) {
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
} finally {
|
|
|
|
|
synchronized (activeHandlers) {
|
|
|
|
|
activeHandlers.remove(SocketHandler.this);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
System.err.println("DBG>Input handler for "+clientID+" has finished.");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
public class Server {
|
|
|
|
|
|
|
|
|
|
public static void main(String[] args) {
|
|
|
|
|
int port=33000, max_conn=2;
|
|
|
|
|
|
|
|
|
|
if (args.length>0) {
|
|
|
|
|
if (args[0].startsWith("--help")) {
|
|
|
|
|
System.out.printf("Usage: Server [PORT] [MAX_CONNECTIONS]\n" +
|
|
|
|
|
"If PORT is not specified, default port %d is used\n" +
|
|
|
|
|
"If MAX_CONNECTIONS is not specified, default number=%d is used",port, max_conn);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
try {
|
|
|
|
|
port=Integer.decode(args[0]);
|
|
|
|
|
} catch (NumberFormatException e) {
|
|
|
|
|
System.err.printf("Argument %s is not integer, using default value",args[0],port);
|
|
|
|
|
}
|
|
|
|
|
if (args.length>1) try {
|
|
|
|
|
max_conn=Integer.decode(args[1]);
|
|
|
|
|
} catch (NumberFormatException e) {
|
|
|
|
|
System.err.printf("Argument %s is not integer, using default value",args[1],max_conn);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
System.out.printf("IM server listening on port %d, maximum nr. of connections=%d...\n", port, max_conn);
|
|
|
|
|
ExecutorService pool=Executors.newFixedThreadPool(2*max_conn);
|
|
|
|
|
ActiveHandlers activeHandlers=new ActiveHandlers();
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
ServerSocket sSocket=new ServerSocket(port);
|
|
|
|
|
do {
|
|
|
|
|
Socket clientSocket=sSocket.accept();
|
|
|
|
|
clientSocket.setKeepAlive(true);
|
|
|
|
|
SocketHandler handler=new SocketHandler(clientSocket, activeHandlers);
|
|
|
|
|
pool.execute(handler.inputHandler);
|
|
|
|
|
pool.execute(handler.outputHandler);
|
|
|
|
|
} while (!pool.isTerminated());
|
|
|
|
|
} catch (UnknownHostException e) {
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
} catch (IOException e) {
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
pool.shutdown();
|
|
|
|
|
try {
|
|
|
|
|
// Wait a while for existing tasks to terminate
|
|
|
|
|
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
|
|
|
|
|
pool.shutdownNow(); // Cancel currently executing tasks
|
|
|
|
|
// Wait a while for tasks to respond to being cancelled
|
|
|
|
|
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
|
|
|
|
|
System.err.println("Pool did not terminate");
|
|
|
|
|
}
|
|
|
|
|
} catch (InterruptedException ie) {
|
|
|
|
|
// (Re-)Cancel if current thread also interrupted
|
|
|
|
|
pool.shutdownNow();
|
|
|
|
|
// Preserve interrupt status
|
|
|
|
|
Thread.currentThread().interrupt();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|