View Javadoc

1   /*
2    * Copyright 2008-2011 Thomas Nichols.  http://blog.thomnichols.org
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   *
16   * You are receiving this code free of charge, which represents many hours of
17   * effort from other individuals and corporations.  As a responsible member 
18   * of the community, you are encouraged (but not required) to donate any 
19   * enhancements or improvements back to the community under a similar open 
20   * source license.  Thank you. -TMN
21   */
22  package groovyx.net.http;
23  
24  import java.io.IOException;
25  import java.net.URISyntaxException;
26  import java.util.Map;
27  import java.util.concurrent.Callable;
28  import java.util.concurrent.ExecutionException;
29  import java.util.concurrent.ExecutorService;
30  import java.util.concurrent.Future;
31  import java.util.concurrent.LinkedBlockingQueue;
32  import java.util.concurrent.ThreadPoolExecutor;
33  import java.util.concurrent.TimeUnit;
34  
35  import org.apache.http.HttpVersion;
36  import org.apache.http.conn.ClientConnectionManager;
37  import org.apache.http.conn.params.ConnManagerParams;
38  import org.apache.http.conn.params.ConnPerRouteBean;
39  import org.apache.http.conn.scheme.PlainSocketFactory;
40  import org.apache.http.conn.scheme.Scheme;
41  import org.apache.http.conn.scheme.SchemeRegistry;
42  import org.apache.http.conn.ssl.SSLSocketFactory;
43  import org.apache.http.impl.client.DefaultHttpClient;
44  import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager;
45  import org.apache.http.params.BasicHttpParams;
46  import org.apache.http.params.HttpConnectionParams;
47  import org.apache.http.params.HttpParams;
48  import org.apache.http.params.HttpProtocolParams;
49  
50  /**
51   * This implementation makes all requests asynchronous by submitting jobs to a 
52   * {@link ThreadPoolExecutor}.  All request methods (including <code>get</code> 
53   * and <code>post</code>) return a {@link Future} instance, whose 
54   * {@link Future#get() get} method will provide access to whatever value was 
55   * returned from the response handler closure.
56   *  
57   * @author <a href='mailto:tomstrummer+httpbuilder@gmail.com'>Tom Nichols</a>
58   */
59  public class AsyncHTTPBuilder extends HTTPBuilder {
60  
61  	/**
62  	 * Default pool size is one is not supplied in the constructor.
63  	 */
64  	public static final int DEFAULT_POOL_SIZE = 4;
65  	
66  	protected ExecutorService threadPool;
67  //		= (ThreadPoolExecutor)Executors.newCachedThreadPool();
68  	
69  	/**
70  	 * Accepts the following named parameters:
71  	 * <dl>
72  	 *  <dt>threadPool</dt><dd>Custom {@link ExecutorService} instance for 
73  	 *  	running submitted requests.  If this is an instance of {@link ThreadPoolExecutor},
74  	 *      the poolSize will be determined by {@link ThreadPoolExecutor#getMaximumPoolSize()}.
75  	 *      The default threadPool uses an unbounded queue to accept an unlimited 
76  	 *      number of requests.</dd>
77  	 *  <dt>poolSize</dt><dd>Max number of concurrent requests</dd>
78  	 *  <dt>uri</dt><dd>Default request URI</dd>
79  	 *  <dt>contentType</dt><dd>Default content type for requests and responses</dd>
80  	 *  <dt>timeout</dt><dd>Timeout in milliseconds to wait for a connection to 
81  	 *  	be established and request to complete.</dd>
82  	 * </dl>
83  	 */
84  	public AsyncHTTPBuilder( Map<String, ?> args ) throws URISyntaxException {
85  		super();
86  		int poolSize = DEFAULT_POOL_SIZE;
87  		ExecutorService threadPool = null;
88  		if ( args != null ) { 
89  			threadPool = (ExecutorService)args.remove( "threadPool" );
90  
91  			if ( threadPool instanceof ThreadPoolExecutor )
92  				poolSize = ((ThreadPoolExecutor)threadPool).getMaximumPoolSize();
93  			
94  			Object poolSzArg = args.remove("poolSize");
95  			if ( poolSzArg != null ) poolSize = Integer.parseInt( poolSzArg.toString() );
96  			
97  			if ( args.containsKey( "url" ) ) throw new IllegalArgumentException(
98  				"The 'url' parameter is deprecated; use 'uri' instead" );
99  			Object defaultURI = args.remove("uri");
100 			if ( defaultURI != null ) super.setUri(defaultURI);
101 				
102 			Object defaultContentType = args.remove("contentType");
103 			if ( defaultContentType != null ) 
104 				super.setContentType(defaultContentType);
105 			
106 			Object timeout = args.remove( "timeout" );
107 			if ( timeout != null ) setTimeout( (Integer) timeout );
108 
109 			if ( args.size() > 0 ) {
110 				String invalidArgs = "";
111 				for ( String k : args.keySet() ) invalidArgs += k + ",";
112 				throw new IllegalArgumentException("Unexpected keyword args: " + invalidArgs);
113 			}
114 		}
115 		this.initThreadPools( poolSize, threadPool );
116 	}
117 	
118 	/**
119 	 * Submits a {@link Callable} instance to the job pool, which in turn will 
120 	 * call {@link HTTPBuilder#doRequest(RequestConfigDelegate)} in an asynchronous 
121 	 * thread.  The {@link Future} instance returned by this value (which in 
122 	 * turn should be returned by any of the public <code>request</code> methods
123 	 * (including <code>get</code> and <code>post</code>) may be used to 
124 	 * retrieve whatever value may be returned from the executed response 
125 	 * handler closure. 
126 	 */
127 	@Override
128 	protected Future<?> doRequest( final RequestConfigDelegate delegate ) {
129 		return threadPool.submit( new Callable<Object>() {
130 			/*@Override*/ public Object call() throws Exception {
131 				try {
132 					return doRequestSuper(delegate);
133 				}
134 				catch( Exception ex ) {
135 					log.info( "Exception thrown from response delegate: " + delegate, ex );
136 					throw ex;
137 				}
138 			}
139 		});
140 	}
141 	
142 	/*
143 	 * Because we can't call "super.doRequest" from within the anonymous 
144 	 * Callable subclass.
145 	 */
146 	private Object doRequestSuper( RequestConfigDelegate delegate ) throws IOException {
147 		return super.doRequest(delegate);
148 	}
149 	
150 	/**
151 	 * Initializes threading parameters for the HTTPClient's 
152 	 * {@link ThreadSafeClientConnManager}, and this class' ThreadPoolExecutor. 
153 	 */
154 	protected void initThreadPools( final int poolSize, final ExecutorService threadPool ) {
155 		if (poolSize < 1) throw new IllegalArgumentException("poolSize may not be < 1");
156 		// Create and initialize HTTP parameters
157 		HttpParams params = client != null ? client.getParams()
158 				: new BasicHttpParams();
159 		ConnManagerParams.setMaxTotalConnections(params, poolSize);
160 		ConnManagerParams.setMaxConnectionsPerRoute(params,
161 				new ConnPerRouteBean(poolSize));
162 
163 		HttpProtocolParams.setVersion(params, HttpVersion.HTTP_1_1);
164 
165 		// Create and initialize scheme registry
166 		SchemeRegistry schemeRegistry = new SchemeRegistry();
167 		schemeRegistry.register( new Scheme( "http", 
168 				PlainSocketFactory.getSocketFactory(), 80 ) );
169 		schemeRegistry.register( new Scheme( "https", 
170 				SSLSocketFactory.getSocketFactory(), 443));
171 
172 		ClientConnectionManager cm = new ThreadSafeClientConnManager(
173 				params, schemeRegistry );
174 		super.client = new DefaultHttpClient( cm, params );
175 
176 		this.threadPool = threadPool != null ? threadPool :
177 			new ThreadPoolExecutor( poolSize, poolSize, 120, TimeUnit.SECONDS, 
178 					new LinkedBlockingQueue<Runnable>() );
179 	}
180 	
181 	/**
182 	 * {@inheritDoc}
183 	 */
184 	@Override
185 	protected Object defaultSuccessHandler( HttpResponseDecorator resp, Object parsedData )
186 			throws ResponseParseException {
187 		return super.defaultSuccessHandler( resp, parsedData );
188 	}
189 	
190 	/**
191 	 * For 'failure' responses (e.g. a 404), the exception will be wrapped in 
192 	 * a {@link ExecutionException} and held by the {@link Future} instance.  
193 	 * The exception is then re-thrown when calling {@link Future#get() 
194 	 * future.get()}.  You can access the original exception (e.g. an 
195 	 * {@link HttpResponseException}) by calling <code>ex.getCause()</code>.  
196 	 * 
197 	 */
198 	@Override
199 	protected void defaultFailureHandler( HttpResponseDecorator resp )
200 			throws HttpResponseException {
201 		super.defaultFailureHandler( resp );
202 	}
203 	
204 	/**
205 	 * This timeout is used for both the time to wait for an established 
206 	 * connection, and the time to wait for data.
207 	 * @see HttpConnectionParams#setSoTimeout(HttpParams, int)
208 	 * @see HttpConnectionParams#setConnectionTimeout(HttpParams, int)
209 	 * @param timeout time to wait in milliseconds.
210 	 */
211 	public void setTimeout( int timeout ) {
212 		HttpConnectionParams.setConnectionTimeout( super.getClient().getParams(), timeout );
213 		HttpConnectionParams.setSoTimeout( super.getClient().getParams(), timeout );
214 		/* this will cause a thread waiting for an available connection instance
215 		 * to time-out   */
216 //		ConnManagerParams.setTimeout( super.getClient().getParams(), timeout );		
217 	}
218 	
219 	/**
220 	 * Get the timeout in for establishing an HTTP connection.
221 	 * @return timeout in milliseconds.
222 	 */
223 	public int getTimeout() {
224 		return HttpConnectionParams.getConnectionTimeout( super.getClient().getParams() );
225 	}
226 	
227 	/**
228 	 * <p>Access the underlying threadpool to adjust things like job timeouts.</p>  
229 	 * 
230 	 * <p>Note that this is not the same pool used by the HttpClient's 
231 	 * {@link ThreadSafeClientConnManager}.  Therefore, increasing the 
232 	 * {@link ThreadPoolExecutor#setMaximumPoolSize(int) maximum pool size} will
233 	 * not in turn increase the number of possible concurrent requests.  It will
234 	 * simply cause more requests to be <i>attempted</i> which will then simply
235 	 * block while waiting for a free connection.</p>
236 	 * 
237 	 * @return the service used to execute requests.  By default this is a 
238 	 * {@link ThreadPoolExecutor}.
239 	 */
240 	public ExecutorService getThreadExecutor() {
241 		return this.threadPool;
242 	}
243 	
244 	/**
245 	 * {@inheritDoc}
246 	 */
247 	@Override public void shutdown() {
248 		super.shutdown(); 
249 		this.threadPool.shutdown();
250 	}
251 	
252 	/**
253 	 * {@inheritDoc}
254 	 * @see #shutdown()
255 	 */
256 	@Override protected void finalize() throws Throwable {
257 		this.shutdown();
258 		super.finalize();
259 	}
260 }