View Javadoc
1   /*
2    * Copyright 2008-2011 Thomas Nichols.  http://blog.thomnichols.org
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   *
16   * You are receiving this code free of charge, which represents many hours of
17   * effort from other individuals and corporations.  As a responsible member
18   * of the community, you are encouraged (but not required) to donate any
19   * enhancements or improvements back to the community under a similar open
20   * source license.  Thank you. -TMN
21   */
22  package groovyx.net.http;
23  
24  import java.io.IOException;
25  import java.net.URISyntaxException;
26  import java.util.Map;
27  import java.util.concurrent.Callable;
28  import java.util.concurrent.ExecutionException;
29  import java.util.concurrent.ExecutorService;
30  import java.util.concurrent.Future;
31  import java.util.concurrent.LinkedBlockingQueue;
32  import java.util.concurrent.ThreadPoolExecutor;
33  import java.util.concurrent.TimeUnit;
34  
35  import org.apache.http.HttpVersion;
36  import org.apache.http.conn.ClientConnectionManager;
37  import org.apache.http.conn.params.ConnManagerParams;
38  import org.apache.http.conn.params.ConnPerRouteBean;
39  import org.apache.http.conn.scheme.PlainSocketFactory;
40  import org.apache.http.conn.scheme.Scheme;
41  import org.apache.http.conn.scheme.SchemeRegistry;
42  import org.apache.http.conn.ssl.SSLSocketFactory;
43  import org.apache.http.impl.client.DefaultHttpClient;
44  import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager;
45  import org.apache.http.params.BasicHttpParams;
46  import org.apache.http.params.HttpConnectionParams;
47  import org.apache.http.params.HttpParams;
48  import org.apache.http.params.HttpProtocolParams;
49  
50  /**
51   * This implementation makes all requests asynchronous by submitting jobs to a
52   * {@link ThreadPoolExecutor}.  All request methods (including <code>get</code>
53   * and <code>post</code>) return a {@link Future} instance, whose
54   * {@link Future#get() get} method will provide access to whatever value was
55   * returned from the response handler closure.
56   *
57   * @author <a href='mailto:tomstrummer+httpbuilder@gmail.com'>Tom Nichols</a>
58   */
59  public class AsyncHTTPBuilder extends HTTPBuilder {
60  
61      /**
62       * Default pool size is one is not supplied in the constructor.
63       */
64      public static final int DEFAULT_POOL_SIZE = 4;
65  
66      protected ExecutorService threadPool;
67  //      = (ThreadPoolExecutor)Executors.newCachedThreadPool();
68  
69      /**
70       * Accepts the following named parameters:
71       * <dl>
72       *  <dt>threadPool</dt><dd>Custom {@link ExecutorService} instance for
73       *      running submitted requests.  If this is an instance of {@link ThreadPoolExecutor},
74       *      the poolSize will be determined by {@link ThreadPoolExecutor#getMaximumPoolSize()}.
75       *      The default threadPool uses an unbounded queue to accept an unlimited
76       *      number of requests.</dd>
77       *  <dt>poolSize</dt><dd>Max number of concurrent requests</dd>
78       *  <dt>uri</dt><dd>Default request URI</dd>
79       *  <dt>contentType</dt><dd>Default content type for requests and responses</dd>
80       *  <dt>timeout</dt><dd>Timeout in milliseconds to wait for a connection to
81       *      be established and request to complete.</dd>
82       * </dl>
83       */
84      public AsyncHTTPBuilder( Map<String, ?> args ) throws URISyntaxException {
85          int poolSize = DEFAULT_POOL_SIZE;
86          ExecutorService threadPool = null;
87          if ( args != null ) {
88              threadPool = (ExecutorService)args.remove( "threadPool" );
89  
90              if ( threadPool instanceof ThreadPoolExecutor )
91                  poolSize = ((ThreadPoolExecutor)threadPool).getMaximumPoolSize();
92  
93              Object poolSzArg = args.remove("poolSize");
94              if ( poolSzArg != null ) poolSize = Integer.parseInt( poolSzArg.toString() );
95  
96              if ( args.containsKey( "url" ) ) throw new IllegalArgumentException(
97                  "The 'url' parameter is deprecated; use 'uri' instead" );
98              Object defaultURI = args.remove("uri");
99              if ( defaultURI != null ) super.setUri(defaultURI);
100 
101             Object defaultContentType = args.remove("contentType");
102             if ( defaultContentType != null )
103                 super.setContentType(defaultContentType);
104 
105             Object timeout = args.remove( "timeout" );
106             if ( timeout != null ) setTimeout( (Integer) timeout );
107 
108             if ( args.size() > 0 ) {
109                 String invalidArgs = "";
110                 for ( String k : args.keySet() ) invalidArgs += k + ",";
111                 throw new IllegalArgumentException("Unexpected keyword args: " + invalidArgs);
112             }
113         }
114         this.initThreadPools( poolSize, threadPool );
115     }
116 
117     /**
118      * Submits a {@link Callable} instance to the job pool, which in turn will
119      * call {@link HTTPBuilder#doRequest(RequestConfigDelegate)} in an asynchronous
120      * thread.  The {@link Future} instance returned by this value (which in
121      * turn should be returned by any of the public <code>request</code> methods
122      * (including <code>get</code> and <code>post</code>) may be used to
123      * retrieve whatever value may be returned from the executed response
124      * handler closure.
125      */
126     @Override
127     protected Future<?> doRequest( final RequestConfigDelegate delegate ) {
128         return threadPool.submit( new Callable<Object>() {
129             /*@Override*/ public Object call() throws Exception {
130                 try {
131                     return doRequestSuper(delegate);
132                 }
133                 catch( Exception ex ) {
134                     log.info( "Exception thrown from response delegate: " + delegate, ex );
135                     throw ex;
136                 }
137             }
138         });
139     }
140 
141     /*
142      * Because we can't call "super.doRequest" from within the anonymous
143      * Callable subclass.
144      */
145     private Object doRequestSuper( RequestConfigDelegate delegate ) throws IOException {
146         return super.doRequest(delegate);
147     }
148 
149     /**
150      * Initializes threading parameters for the HTTPClient's
151      * {@link ThreadSafeClientConnManager}, and this class' ThreadPoolExecutor.
152      */
153     protected void initThreadPools( final int poolSize, final ExecutorService threadPool ) {
154         if (poolSize < 1) throw new IllegalArgumentException("poolSize may not be < 1");
155         // Create and initialize HTTP parameters
156         HttpParams params = super.getClient().getParams();
157         ConnManagerParams.setMaxTotalConnections(params, poolSize);
158         ConnManagerParams.setMaxConnectionsPerRoute(params,
159                 new ConnPerRouteBean(poolSize));
160 
161         HttpProtocolParams.setVersion(params, HttpVersion.HTTP_1_1);
162 
163         // Create and initialize scheme registry
164         SchemeRegistry schemeRegistry = new SchemeRegistry();
165         schemeRegistry.register( new Scheme( "http",
166                 PlainSocketFactory.getSocketFactory(), 80 ) );
167         schemeRegistry.register( new Scheme( "https",
168                 SSLSocketFactory.getSocketFactory(), 443));
169 
170         ClientConnectionManager cm = new ThreadSafeClientConnManager(
171                 params, schemeRegistry );
172         setClient(new DefaultHttpClient( cm, params ));
173 
174         this.threadPool = threadPool != null ? threadPool :
175             new ThreadPoolExecutor( poolSize, poolSize, 120, TimeUnit.SECONDS,
176                     new LinkedBlockingQueue<Runnable>() );
177     }
178 
179     /**
180      * {@inheritDoc}
181      */
182     @Override
183     protected Object defaultSuccessHandler( HttpResponseDecorator resp, Object parsedData )
184             throws ResponseParseException {
185         return super.defaultSuccessHandler( resp, parsedData );
186     }
187 
188     /**
189      * For 'failure' responses (e.g. a 404), the exception will be wrapped in
190      * a {@link ExecutionException} and held by the {@link Future} instance.
191      * The exception is then re-thrown when calling {@link Future#get()
192      * future.get()}.  You can access the original exception (e.g. an
193      * {@link HttpResponseException}) by calling <code>ex.getCause()</code>.
194      *
195      */
196     @Override
197     protected void defaultFailureHandler( HttpResponseDecorator resp )
198             throws HttpResponseException {
199         super.defaultFailureHandler( resp );
200     }
201 
202     /**
203      * This timeout is used for both the time to wait for an established
204      * connection, and the time to wait for data.
205      * @see HttpConnectionParams#setSoTimeout(HttpParams, int)
206      * @see HttpConnectionParams#setConnectionTimeout(HttpParams, int)
207      * @param timeout time to wait in milliseconds.
208      */
209     public void setTimeout( int timeout ) {
210         HttpConnectionParams.setConnectionTimeout( super.getClient().getParams(), timeout );
211         HttpConnectionParams.setSoTimeout( super.getClient().getParams(), timeout );
212         /* this will cause a thread waiting for an available connection instance
213          * to time-out   */
214 //      ConnManagerParams.setTimeout( super.getClient().getParams(), timeout );
215     }
216 
217     /**
218      * Get the timeout in for establishing an HTTP connection.
219      * @return timeout in milliseconds.
220      */
221     public int getTimeout() {
222         return HttpConnectionParams.getConnectionTimeout( super.getClient().getParams() );
223     }
224 
225     /**
226      * <p>Access the underlying threadpool to adjust things like job timeouts.</p>
227      *
228      * <p>Note that this is not the same pool used by the HttpClient's
229      * {@link ThreadSafeClientConnManager}.  Therefore, increasing the
230      * {@link ThreadPoolExecutor#setMaximumPoolSize(int) maximum pool size} will
231      * not in turn increase the number of possible concurrent requests.  It will
232      * simply cause more requests to be <i>attempted</i> which will then simply
233      * block while waiting for a free connection.</p>
234      *
235      * @return the service used to execute requests.  By default this is a
236      * {@link ThreadPoolExecutor}.
237      */
238     public ExecutorService getThreadExecutor() {
239         return this.threadPool;
240     }
241 
242     /**
243      * {@inheritDoc}
244      */
245     @Override public void shutdown() {
246         super.shutdown();
247         this.threadPool.shutdown();
248     }
249 
250     /**
251      * {@inheritDoc}
252      * @see #shutdown()
253      */
254     @Override protected void finalize() throws Throwable {
255         this.shutdown();
256         super.finalize();
257     }
258 }