因为,我们生活在云计算和并行时代。这里有另一种方法。
public class DistributedHistogram
{
private static final int PORT = 1337;
private static final String LOOPBACK = "127.0.13.37";
public static final byte[] DATA = new byte[] {(byte) 0xFF, (byte) 0xFF};
public static final byte[] STOP = new byte[] {(byte) 0xDE, (byte) 0xAD};
public static void main(String[] args) throws IOException, InterruptedException
{
ExecutorService se = Executors.newSingleThreadExecutor();
se.submit(new Server(PORT, 16));
System.out.print("Please insert string: ");
Scanner s = new Scanner(System.in);
String input = s.nextLine();
s.close();
System.out.println(input);
ExecutorService ce = Executors.newFixedThreadPool(16);
List<Future<Void>> futures = new ArrayList<Future<Void>>();
for (char c : input.toCharArray())
futures.add(ce.submit(new Client(new Character[]{c}, DATA, LOOPBACK, PORT)));
/* wait for the clients to complete before we send stop to server */
for (Future<Void> f : futures)
{
try
{
@SuppressWarnings ("unused")
Void v = f.get();
}
catch (ExecutionException e)
{
//...
}
}
ce.submit(new StopClient(LOOPBACK, PORT)); // sends stop signal
ce.shutdown();
se.shutdown();
}
}
class Client implements Callable<Void>
{
private final Character[] chars;
private final String ip;
private final int port;
private final byte[] type;
public Client(Character[] chars, byte[] type, String ip, int port)
{
this.chars = chars;
this.type = type;
this.ip = ip;
this.port = port;
}
@Override
public Void call() throws Exception
{
Socket s = new Socket(ip, port);
DataOutputStream out = new DataOutputStream(s.getOutputStream());
for (Character c : chars) {
out.write(type);
out.writeChar(c);
}
out.flush();
out.close();
s.close();
return null;
}
}
class StopClient extends Client {
public StopClient(String ip, int port)
{
super(new Character[]{' '}, DistributedHistogram.STOP, ip, port);
}
}
class Server implements Callable<Void>
{
private final int port;
private ServerSocket ss;
private final ExecutorService e;
private final ConcurrentHistogram ch = new ConcurrentHistogram();
private final AtomicInteger client = new AtomicInteger();
private AtomicBoolean quit = new AtomicBoolean(false);
public Server(int port, int clients)
{
this.port = port;
this.e = Executors.newFixedThreadPool(clients);
}
public ConcurrentHistogram getHistogram() { return ch; }
public void stop()
{
quit.set(true);
e.submit(new Callable<Void>()
{
@Override
public Void call() throws Exception
{
Thread.sleep(250);
ss.close();
return null;
}
});
}
@Override
public Void call() throws Exception
{
ss = new ServerSocket(port);
while (!quit.get() && !ss.isClosed())
{
try
{
e.submit(new ClientHandler(client.getAndIncrement(), ss.accept(), this));
}
catch (SocketException se)
{ continue; }
}
e.shutdown();
System.out.println(ch.toString());
while (!e.isTerminated()) { /* wait */ }
return null;
}
}
class ConcurrentHistogram
{
private final ConcurrentMap<Character, AtomicInteger> histogram = new ConcurrentHashMap<Character, AtomicInteger>();
private static final String HISTOGRAM_CHAR = "*";
public ConcurrentMap<Character, AtomicInteger> getHistogram() { return histogram; }
private String createAsterisk(int number)
{
StringBuilder sb = new StringBuilder();
for (int i = 0; i < number; i++)
sb.append(HISTOGRAM_CHAR);
return sb.toString();
}
@Override
public String toString()
{
StringBuilder sb = new StringBuilder();
List<Entry<Character, AtomicInteger>> data = new ArrayList<Entry<Character, AtomicInteger>>(histogram.entrySet());
Collections.sort(data, new Comparator<Entry<Character, AtomicInteger>>()
{
@Override
public int compare(Entry<Character, AtomicInteger> o1, Entry<Character, AtomicInteger> o2)
{
return o1.getKey().compareTo(o2.getKey());
}
});
for (Entry<Character, AtomicInteger> entry : data)
{
int value = entry.getValue().get();
sb.append(entry.getKey() + " " + String.format("%4s", "(" + value + ")") + " " + createAsterisk(value) + "\n");
}
return sb.toString();
}
public void addChar(Character c)
{
AtomicInteger value = histogram.get(c);
if (value == null)
{
histogram.putIfAbsent(c, new AtomicInteger());
value = histogram.get(c);
}
value.incrementAndGet();
}
}
class ClientHandler implements Callable<Void>
{
@SuppressWarnings ("unused")
private final int client;
private final Socket s;
private final Server server;
public ClientHandler(int client, Socket s, Server server)
{
this.client = client;
this.s = s;
this.server = server;
}
@Override
public Void call() throws Exception
{
DataInputStream in = new DataInputStream(s.getInputStream());
int c;
int i = 0;
byte[] bytes = new byte[2];
while ((c = in.read()) != -1)
{
if (i < 2)
{ bytes[i++] = ((byte) c); }
else if (Arrays.equals(bytes, DistributedHistogram.DATA))
{
i = 0;
char ch = (char) (((c & 0x00FF) << 8) + (in.read() & 0x00FF));
server.getHistogram().addChar(ch);
}
else if (Arrays.equals(bytes, DistributedHistogram.STOP))
{
i = 0;
server.stop();
}
else
{ i = 0; }
}
in.close();
s.close();
return null;
}
}