1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.caleigo.toolkit.tunnel;
20
21
22 import java.util.*;
23 import java.io.*;
24
25 import org.caleigo.toolkit.log.*;
26 import org.caleigo.toolkit.util.CircularByteBuffer;
27
28 /***
29 *
30 * @author Mattias Hagstrand
31 * @version 1.00
32 *
33 *//*
34 *
35 * WHEN WHO WHY & WHAT
36 * -----------------------------------------------------------------------------
37 * 2002-07-24 Mattias Hagstrand Creation
38 */
39 public class HTTPTunnelServer extends AbstractTunnelServer
40 {
41
42 protected int mCurrentSessionID;
43 protected Hashtable mRegisteredTunnels;
44
45
46 public HTTPTunnelServer()
47 {
48 mRegisteredTunnels = new Hashtable();
49 this.addMessageConsumer(new CloseMessageConsumer());
50 }
51
52
53
54 /***
55 * Sets up a new connection. This method will block until the connection is
56 * closed.
57 */
58 protected void newConnection(Object connectionMessage, OutputStream outputStream, int sessionID)
59 {
60 Log.print(this, "newConnection: " + sessionID);
61
62 ObjectOutputStream objectOutputStream = null;
63 try
64 {
65 objectOutputStream = new ObjectOutputStream(outputStream);
66 }
67 catch (IOException ioe)
68 {
69 Log.printError(this, "Couldn't create output stream", ioe);
70 return;
71 }
72
73 HTTPTunnel tunnel = null;
74 try
75 {
76
77 HTTPInputStream httpInputStream = new HTTPInputStream(1024);
78 tunnel = new HTTPTunnel(httpInputStream, outputStream);
79 setUpTunnel(connectionMessage, tunnel);
80 Log.print(this, "Connection set up successfully!");
81 Object returnMessage = new AbstractTunnel.DefaultStatusMessage(AbstractTunnel.IStatusMessage.OK,
82 "Connection established successfully");
83 returnMessage = new HTTPTunnel.HTTPTunnelStatusMessage((AbstractTunnel.IStatusMessage) returnMessage, sessionID);
84
85
86 Log.print(this, "Sending return message: " + returnMessage.toString());
87 objectOutputStream.writeObject(returnMessage);
88 objectOutputStream.flush();
89 }
90 catch (Exception e)
91 {
92 Log.printError(this, "Couldn't set up connection", e);
93 Object returnMessage = new AbstractTunnel.DefaultStatusMessage(AbstractTunnel.IStatusMessage.NOT_OK,
94 e.getMessage());
95 try
96 {
97 objectOutputStream.writeObject(returnMessage);
98 objectOutputStream.flush();
99 }
100 catch (IOException ioe)
101 {
102 Log.printError(this, "Couldn't send return message", ioe);
103 }
104 return;
105 }
106 try
107 {
108 tunnel.initialize();
109
110
111
112 Integer sessionIDKey = new Integer(sessionID);
113 mRegisteredTunnels.put(sessionIDKey, tunnel);
114 this.waitForSessionClose(sessionIDKey);
115 }
116 catch (Exception e)
117 {
118 Log.printError(this, "Couldn't initialize tunnel", e);
119 }
120 }
121
122 protected void closeConnection(int sessionID)
123 {
124 Log.print(this, "closeConnection: " + sessionID);
125 HTTPTunnel tunnel = (HTTPTunnel) mRegisteredTunnels.get(new Integer(sessionID));
126 if (tunnel != null)
127
128 try
129 {
130 if (tunnel.isActive())
131 tunnel.finalize();
132 }
133 catch (Exception e)
134 {
135 Log.printError(this, "Couldn't finalize tunnel", e);
136 }
137 finally
138 {
139 this.doOnTunnelClosed(tunnel);
140 }
141 }
142
143
144 public void tunnelClosed(TunnelEvent event)
145 {
146 super.tunnelClosed(event);
147 this.doOnTunnelClosed(event.getSourceTunnel());
148 }
149
150 public void connectionLost(TunnelEvent event)
151 {
152 super.connectionLost(event);
153 this.doOnTunnelClosed(event.getSourceTunnel());
154 }
155
156
157 protected synchronized int createSessionID()
158 {
159 if (mCurrentSessionID < Integer.MAX_VALUE)
160 mCurrentSessionID++;
161 else
162 mCurrentSessionID = 1;
163
164 return mCurrentSessionID;
165 }
166
167 protected synchronized void handleInputData(int sessionID, byte[] data)
168 {
169 HTTPInputStream httpInputStream = (HTTPInputStream) ((HTTPTunnel) mRegisteredTunnels.get(new Integer(sessionID))).getInputStream();
170 if (httpInputStream != null)
171 httpInputStream.addInputData(data);
172 else
173 Log.printWarning(this, "Couldn't handle input data for session id " + sessionID);
174 }
175
176
177
178 protected synchronized void doOnTunnelClosed(ITunnel tunnel)
179 {
180 if (tunnel != null)
181 synchronized (tunnel)
182 {
183 tunnel.notifyAll();
184 }
185 }
186
187 protected void waitForSessionClose(Integer sessionID)
188 {
189 synchronized (mRegisteredTunnels.get(sessionID))
190 {
191 try
192 {
193
194 mRegisteredTunnels.get(sessionID).wait();
195
196
197 mRegisteredTunnels.remove(sessionID);
198 }
199 catch (InterruptedException ie)
200 {
201 Log.printError(this, "Interrupted", ie);
202 }
203 }
204
205 Log.print(this, "Session closed");
206 }
207
208
209 protected class CloseMessageConsumer implements IMessageConsumer
210 {
211
212
213 /*** Returns <code>true</code> if this IMessageConsumer accepts the message.
214 * This method is allways called before <code>consumeMessage</code> for any
215 * given message.
216 *
217 */
218 public boolean acceptsMessage(Object message)
219 {
220 return (message instanceof HTTPTunnel.HTTPTunnelCloseMessage);
221 }
222
223 /*** Tells the IMessageConsumer to consume a message and returns a new message
224 * that will be delivered to the sender of the originial message.
225 *
226 */
227 public Object answerMessage(Object message)
228 {
229 return null;
230 }
231
232 /*** Tells the IMessageConsumer to consume a message.
233 *
234 */
235 public void consumeMessage(Object message)
236 {
237 Log.print(this, "Close message received");
238 closeConnection(((HTTPTunnel.HTTPTunnelCloseMessage) message).getSessionID());
239 }
240 }
241
242 protected class HTTPInputStream extends InputStream
243 {
244
245 protected byte[] mSingleByteBuffer = new byte[1];
246 protected CircularByteBuffer mBuffer;
247 protected boolean mEOF;
248 protected boolean mClosed;
249
250
251 public HTTPInputStream(int bufferSize)
252 {
253 mBuffer = new CircularByteBuffer(bufferSize);
254 }
255
256
257 public int available()
258 throws IOException
259 {
260 return mBuffer.getBufferSize();
261 }
262
263 public void close()
264 throws IOException
265 {
266 mClosed = true;
267 synchronized (mBuffer)
268 {
269 mBuffer.notifyAll();
270 }
271 }
272
273 public int read()
274 throws IOException
275 {
276 this.read(mSingleByteBuffer, 0, 1);
277 return mSingleByteBuffer[0];
278 }
279
280 public int read(byte[] b, int off, int len)
281 throws IOException
282 {
283 synchronized (mBuffer)
284 {
285 while (mBuffer.getBufferSize() == 0)
286 {
287 try
288 {
289 if (mEOF)
290 throw new EOFException();
291
292 if (mClosed)
293 return -1;
294
295 mBuffer.wait();
296 }
297 catch (InterruptedException ie)
298 {
299 throw new IOException(ie.getMessage());
300 }
301 }
302 }
303
304 return mBuffer.getFromBuffer(b, off, len);
305 }
306
307
308 protected void addInputData(byte[] data)
309 {
310 synchronized (mBuffer)
311 {
312 mBuffer.addToBuffer(data, 0, data.length);
313 mBuffer.notifyAll();
314 }
315 }
316
317 protected void eof()
318 {
319 mEOF = true;
320 synchronized (mBuffer)
321 {
322 mBuffer.notifyAll();
323 }
324 }
325 }
326 }