View Javadoc

1   /*
2    * AI Soccer Project - network gaming environment for AI warriors.
3    * Copyright (C) 2001-2004  Marcin Werla, Pawel Widera
4    *
5    * This program is free software; you can redistribute it and/or
6    * modify it under the terms of the GNU General Public License
7    * as published by the Free Software Foundation; either version 2
8    * of the License, or (at your option) any later version.
9    *
10   * This program is distributed in the hope that it will be useful,
11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   * GNU General Public License for more details.
14   *
15   * You should have received a copy of the GNU General Public License
16   * along with this program; if not, you can find it here:
17   * http://www.gnu.org/licenses/gpl.html
18   */
19  
20  package aigames.soccer.communication;
21  
22  import org.apache.log4j.Logger;
23  
24  import org.dom4j.Document;
25  import org.dom4j.DocumentException;
26  
27  import org.dom4j.io.SAXReader;
28  
29  import snifos.common.UserId;
30  
31  import snifos.communication.CommModule;
32  
33  import snifos.exception.ConfigurationException;
34  import snifos.exception.InvalidUserException;
35  
36  import snifos.server.Server;
37  
38  import java.io.ByteArrayInputStream;
39  import java.io.IOException;
40  import java.io.InputStream;
41  
42  import java.net.InetAddress;
43  import java.net.InetSocketAddress;
44  
45  import java.nio.ByteBuffer;
46  import java.nio.CharBuffer;
47  import java.nio.channels.SelectionKey;
48  import java.nio.channels.Selector;
49  import java.nio.channels.ServerSocketChannel;
50  import java.nio.channels.SocketChannel;
51  import java.nio.charset.CharacterCodingException;
52  import java.nio.charset.Charset;
53  import java.nio.charset.CharsetDecoder;
54  
55  import java.util.HashMap;
56  import java.util.Iterator;
57  import java.util.Map;
58  import java.util.Properties;
59  import java.util.Set;
60  
61  
62  /***
63   * The socket communication module class.
64   * @version $Id: SocketCommModule.java,v 1.5 2004/05/08 21:55:26 mwerla Exp $
65   */
66  public class SocketCommModule implements CommModule {
67      /* end of transmission string */
68      private static String EOT = "\n\004\n";
69      private static Logger logger = Logger.getLogger(SocketCommModule.class);
70      private Server server;
71      private Map userIdToKeyMap = new HashMap();
72      private int userCounter = 0;
73      private boolean active = true;
74      private boolean test = false; //to indicate JUnit test mode
75      private int id = 0;
76      private int port = 9669;
77      private String name = "UNSET";
78      private String charsetName = "UTF-8";
79  
80      /***
81           * Default constructor.
82           */
83      public SocketCommModule() {
84          logger.info("CommModule created.");
85      }
86  
87      /***
88           * @see snifos.communication.CommModule#configure(java.util.Properties, int)
89           */
90      public void configure(Properties configuration, int commModuleId) {
91          this.id = commModuleId;
92          this.port = Integer.parseInt(configuration.getProperty("port"));
93          this.name = configuration.getProperty("name");
94          this.charsetName = configuration.getProperty("charset-name");
95          logger.info("CommModule" + id + " configured.");
96      }
97  
98      /***
99           * @see snifos.communication.CommModule#setServer(snifos.server.Server)
100          */
101     public void setServer(Server server) {
102         this.server = server;
103         logger.info("CommModule" + id + " server set.");
104     }
105 
106     /***
107      * @see snifos.communication.CommModule#sendMessage(snifos.common.UserId, org.dom4j.Document)
108      */
109     public void sendMessage(UserId userId, Document message)
110         throws InvalidUserException {
111         String xml = message.asXML() + EOT;
112         logger.info("Sending message to " + userId + " from CommModule" + id +
113             ":\n" + xml);
114 
115         ByteBuffer buffer = ByteBuffer.wrap(xml.getBytes());
116         SelectionKey clientKey = (SelectionKey) userIdToKeyMap.get(userId);
117 
118         if (null == clientKey) {
119             throw new InvalidUserException();
120         }
121 
122         SocketChannel client = (SocketChannel) clientKey.channel();
123 
124         try {
125             int n = 0;
126 
127             while (buffer.remaining() > 0) {
128                 n += client.write(buffer);
129             }
130 
131             logger.info("CommModule" + id + " sent " + n + " bytes to " +
132                 userId);
133         } catch (IOException e) {
134             logger.warn("Client socket write error: " + e.getMessage());
135             server.unregisterUser(userId);
136             disconnectUser(userId);
137         }
138     }
139 
140     /***
141          * @see snifos.communication.CommModule#disconnectUser(snifos.common.UserId)
142          */
143     public void disconnectUser(UserId userId) {
144         SelectionKey clientKey = (SelectionKey) userIdToKeyMap.get(userId);
145         SocketChannel client = (SocketChannel) clientKey.channel();
146 
147         try {
148             client.socket().close();
149             clientKey.cancel(); //remove the key from selector						
150         } catch (IOException e) {
151             logger.error("Client socket closing error: " + e.getMessage());
152         }
153 
154         userIdToKeyMap.remove(userId);
155         logger.info("User " + userId + " disconnected from CommModule" + id);
156     }
157 
158     /***
159      * @see snifos.communication.CommModule#shutdown()
160      */
161     public void shutdown() {
162         active = false;
163     }
164 
165     /***
166      * @see java.lang.Runnable#run()
167      */
168     public void run() {
169         logger.info("Hello from CommModule" + id + " with name " + name);
170 
171         ServerSocketChannel socket = createServerSocketChannel();
172 
173         if (null == socket) {
174             return;
175         }
176 
177         Selector selector = createSelector(socket);
178 
179         if (null == selector) {
180             return;
181         }
182 
183         while (active) {
184             try {
185                 logger.info("CommModule" + id + " waiting...");
186                 selector.select(); //wait on a socket for an event
187             } catch (IOException e) {
188                 logger.error("Select error: " + e.getMessage());
189 
190                 try {
191                     selector.close();
192                     socket.close();
193                 } catch (IOException ee) {
194                     logger.error("Server socket closing error: " +
195                         ee.getMessage());
196                 }
197 
198                 return;
199             } catch (NullPointerException e) {
200                 //ignoring internal exception - java bug #4729342
201             } catch (java.nio.channels.CancelledKeyException e) {
202                 //ignoring internal exception - java bug #4729342
203             }
204 
205             Set activeKeys = selector.selectedKeys();
206             Iterator i = activeKeys.iterator();
207 
208             while (i.hasNext()) {
209                 SelectionKey key = (SelectionKey) i.next();
210                 i.remove();
211 
212                 //if connection request
213                 try {
214                     if (key.isAcceptable()) {
215                         SocketChannel client = acceptClient(socket, selector);
216 
217                         //create new user
218                         UserId userId = new UserId(id, userCounter++);
219 
220                         //attach client data object to a key					
221                         SelectionKey clientKey = client.keyFor(selector);
222                         clientKey.attach(new ClientData(client, userId));
223 
224                         //map user to selection key
225                         userIdToKeyMap.put(userId, clientKey);
226 
227                         logger.info("CommModule" + id + " user " + userId +
228                             " accepted.");
229 
230                         continue;
231                     }
232 
233                     //if ready to read
234                     if (key.isReadable()) {
235                         SocketChannel client = (SocketChannel) key.channel();
236                         String message = readMessage(client);
237                         ClientData data = (ClientData) key.attachment();
238 
239                         if (null == message) {
240                             server.unregisterUser(data.getUserId());
241                             disconnectUser(data.getUserId());
242                         } else {
243                             processData(message, data);
244                         }
245                     }
246                 } catch (java.nio.channels.CancelledKeyException e) {
247                     //ignoring internal exception - java bug #4729342
248                 } catch (NullPointerException e) {
249                     //ignoring internal exception - java bug #4729342
250                 }
251             }
252         }
253 
254         logger.info("CommModule" + id + " shutting down.");
255     }
256 
257     private ServerSocketChannel createServerSocketChannel() {
258         ServerSocketChannel socket = null;
259 
260         try {
261             socket = ServerSocketChannel.open();
262 
263             //set nonblocking I/O
264             socket.configureBlocking(false);
265 
266             InetAddress address = InetAddress.getLocalHost();
267             InetSocketAddress socketAddress = new InetSocketAddress(address,
268                     port);
269             socket.socket().setReuseAddress(true);
270             socket.socket().bind(socketAddress);
271         } catch (IOException e) {
272             logger.error("Server socket creation error: " + e.getMessage());
273 
274             return null;
275         }
276 
277         logger.info("CommModule" + id + " server socket set.");
278 
279         return socket;
280     }
281 
282     private Selector createSelector(ServerSocketChannel socket) {
283         Selector selector = null;
284 
285         try {
286             selector = Selector.open();
287             socket.register(selector, SelectionKey.OP_ACCEPT);
288         } catch (Exception e) {
289             logger.error("Selector creation error: " + e.getMessage());
290 
291             try {
292                 socket.close();
293             } catch (IOException ee) {
294                 logger.error("Server socket closing error: " + ee.getMessage());
295             }
296 
297             return null;
298         }
299 
300         return selector;
301     }
302 
303     private SocketChannel acceptClient(ServerSocketChannel socket,
304         Selector selector) {
305         SocketChannel client = null;
306 
307         try {
308             client = socket.accept();
309             client.configureBlocking(false);
310 
311             //add socket read event to the selector
312             client.register(selector, SelectionKey.OP_READ);
313         } catch (IOException e) {
314             logger.error("Client socket accept error: " + e.getMessage());
315         }
316 
317         return client;
318     }
319 
320     private String readMessage(SocketChannel client) {
321         int BUFFER_SIZE = 4096;
322 
323         ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
324 
325         try {
326             client.read(buffer);
327         } catch (IOException e) {
328             logger.warn("Client socket read error: " + e.getMessage());
329 
330             return null;
331         }
332 
333         buffer.flip();
334 
335         if (0 == buffer.limit()) {
336             logger.info("Closing client on 0 read.");
337 
338             return null;
339         }
340 
341         return decodeCharset(buffer);
342     }
343 
344     /***
345      * Bytes to characters decoding with given charset.
346      * @param buffer bytes to decode
347      * @return String decoded string
348      */
349     private String decodeCharset(ByteBuffer buffer) {
350         String message = null;
351 
352         try {
353             Charset charset = Charset.forName(charsetName);
354             CharsetDecoder decoder = charset.newDecoder();
355             CharBuffer charBuffer = decoder.decode(buffer);
356             message = charBuffer.toString();
357         } catch (CharacterCodingException e) {
358             logger.error("Character decoding error: " + e.getMessage());
359         }
360 
361         return message;
362     }
363 
364     private void processData(String message, ClientData data) {
365         logger.info("CommModule" + id + " received message from " +
366             data.getUserId() + ":\n" + message);
367         data.append(message);
368 
369         int messageBegin = 0;
370         int messageEnd = data.getBuffer().indexOf(EOT);
371 
372         while (-1 < messageEnd) {
373             String xml = data.getBuffer().substring(messageBegin, messageEnd);
374             InputStream stream = new ByteArrayInputStream(xml.getBytes());
375             SAXReader reader = new SAXReader();
376 
377             try {
378                 Document doc = reader.read(stream);
379 
380                 if (test) {
381                     sendMessage(data.getUserId(), doc);
382                 } else {
383                     server.receiveMessage(data.getUserId(), doc);
384                 }
385 
386                 messageBegin = messageEnd + EOT.length();
387                 messageEnd = data.getBuffer().indexOf(EOT, messageBegin);
388             } catch (DocumentException e) {
389                 logger.error("CommModule" + id + " parsing error: " +
390                     e.getMessage());
391                 messageEnd = -1;
392             } catch (InvalidUserException e) {
393                 logger.error("CommModule" + id + " user exception: " +
394                     e.getMessage());
395                 server.unregisterUser(data.getUserId());
396                 disconnectUser(data.getUserId());
397                 messageEnd = -1;
398             } catch (ConfigurationException e) {
399                 logger.error("CommModule" + id + " configuration exception: " +
400                     e.getMessage());
401                 shutdown();
402                 messageEnd = -1;
403             }
404         }
405 
406         data.clearBuffer(messageBegin);
407     }
408 
409     /***
410      * Setter for test property.
411      * @param b
412      */
413     public void setTest(boolean b) {
414         test = b;
415     }
416 }