如何使用Javamultithreading检索mysql数据

我的mysql表中有大约5000万条记录。 当我使用java检索它需要超过20小时。 最近我在处理500,000条记录后面临通信链路故障错误。 ( 问题 ) 有人可以告诉我,如何在java中使用multithreading访问这些记录。 检索每条记录后,我需要执行一些预处理,然后将结果存储在文本文件中。 谢谢。

SQLManager.java

 package twcore.core.sql; import java.io.File; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.HashMap; import java.util.Iterator; import twcore.core.BotSettings; import twcore.core.SubspaceBot; import twcore.core.events.; import twcore.core.util.Tools; /** * Thread-based main class for the core's SQL database functionality. * Initializes and manages SQL connection pools and queries, and runs * background queries on a semi-regular basis. * 

* * Choosing a standard query vs. background/high-priority background query: *

* Standard / foreground - Runs exactly when needed. Does not wait in * a queue to execute (unless connections are low). Does not require a unique * identifier or special event handling. However, a standard query will * pause the program thread until the results are returned. For large queries * and bad connections this may result in long delays and unresponsiveness. *

* Background - Runs as a separate program thread. Waits in a * queue and can be delayed by other queries waiting to execute. Requires the * bot to catch an SQLResultEvent and use a unique identifier to refer to the * query. As a separate thread, after the background query is run, the bot * continues execution as normal, causing no delays. Ideal when multiple users * may need to access large amounts of SQL data from the same bot at the same * time without compromising responsiveness to the bot for others. However, * their individual result sets may return more slowly than with a standard query. *

* High-priority background - Same as a background query, but added to the * head of the queue. Combines the versatility of a background queue with the * foreground's ability to return the result set almost instantly. *

* IMPORTANT NOTE * For every query you MUST run BotAction's SQLClose(), or manually run the close() * method on both the ResultSet and the Statement that created it. If you do not, * memory leaks may occur! * * TODO: * Setup Apache Commons DBCP to remove CommunicatonsExceptions: * validationQuery="SELECT 1" * testOnBorrow="true" */ public class SQLManager extends Thread { BotSettings sqlcfg; // Reference to SQL config file HashMap pools; // Connection pool storage HashMap queues; // Background queue storage boolean operational = true; // Status of SQL system final static int THREAD_SLEEP_TIME = 30 * Tools.TimeInMillis.SECOND; // Length of time for thread to // sleep, in ms, after all // background queries are done. final static int STALE_TIME = 15 * Tools.TimeInMillis.MINUTE; // Time in ms between stale conn checks. private long nextStaleCheck = 0; /** * Initialize SQL functionality with the information given in the specified * configuration file. * @param configFile Properly formatted CFG file containing SQL system data */ public SQLManager( File configFile ) { super("SQLManager"); pools = new HashMap(); queues = new HashMap(); sqlcfg = new BotSettings( configFile ); System.out.println( "=== SQL Initialization ===" ); try{ for( int i = 1; i <= sqlcfg.getInt( "ConnectionCount" ); i++ ){ String name = sqlcfg.getString( "Name" + i ); // TODO: Migrate to a DataSource object and pass that to SQLConnectionPool // (com.mysql.jdbc.jdbc2.optional.MysqlDataSource) String dburl = "jdbc:mysql://" + sqlcfg.getString( "Server" + i ) + ":" + sqlcfg.getInt( "Port" + i ) + "/" + sqlcfg.getString( "Database" + i ) + "?user=" + sqlcfg.getString( "Login" + i ) + "&password=" + sqlcfg.getString( "Password" + i ) + // Available properties (and info about them) // http://dev.mysql.com/doc/refman/5.0/en/connector-j-reference-configuration-properties.html "&allowMultiQueries=true" + "&maxReconnects=2147483647" + "&initialTimeout=1" + "&logSlowQueries=false" + "&interactiveClient=true" + "&autoReconnect=true" + // Auto-Reconnect not recommended "&autoReconnectForPools=true"; // TODO: Better pooling solutions now exist that can be configured to our needs. SQLConnectionPool db = new SQLConnectionPool( name, dburl, sqlcfg.getInt( "MinPoolSize" + i ), sqlcfg.getInt( "MaxPoolSize" + i ), sqlcfg.getInt( "WaitIfBusy" + i ), sqlcfg.getString( "Driver" + i ) ); pools.put( name, db ); queues.put( name, new SQLBackgroundQueue() ); } Tools.printLog( "SQL Connection Pools initialized successfully." ); for( Iterator i = pools.values().iterator(); i.hasNext(); ){ Tools.printLog( i.next().toString() ); } } catch( SQLException e ){ Tools.printLog( "Failed to load SQL Connection Pools. Driver missing?" ); operational = false; Tools.printLog( e.getMessage() ); } if( operational ){ start(); Tools.printLog( "SQL Background Queues initialized." ); } else { Tools.printLog( "SQL Background Queues NOT initialized." ); } System.out.println(); nextStaleCheck = System.currentTimeMillis() + STALE_TIME; } /** * Adds a regular background query to the end of the queue. If there are no * queued queries ahead of it, the background query will be executed nearly * as quickly as a regularly executed query, but without delaying the bot's * thread to retrieve the result set. The query is instead run in a new * thread and returned to the bot via an SQLResultEvent, and is identified by * a unique key (identifier). * @param connName Name of the connection as defined in sql.cfg * @param identifier The unique identifier for this query * @param query A properly-formed SQL query * @param bot The bot requesting the query (if unsure, use this) */ public void queryBackground( String connName, String identifier, String query, SubspaceBot bot ){ if( !operational ){ Tools.printLog( "Unable to process query: " + query ); } else { if( !pools.containsKey( connName )) { Tools.printLog( "Invalid connection name supplied: '" + connName + "'" ); return; } SQLBackgroundQueue queue = queues.get( connName ); queue.addQuery( new SQLResultEvent( query, identifier, bot )); interrupt(); } } /** * Adds a background query to the front of the queue. A high-priority * background query will be executed nearly as quickly as a regularly * executed query, but without delaying the bot's thread to retrieve the * results. The query is instead run in a new thread and returned to the bot * via an SQLResultEvent, and is identified by a unique key (identifier). * @param connName Name of the connection as defined in sql.cfg * @param identifier The unique identifier for this query * @param query A properly-formed SQL query * @param bot The bot requesting the query (if unsure, use this) */ public void queryBackgroundHighPriority( String connName, String identifier, String query, SubspaceBot bot ){ if( !operational ){ Tools.printLog( "Unable to process background high priority query: " + query ); } else { if( !pools.containsKey( connName )) { Tools.printLog( "Invalid connection name supplied: '" + connName + "'" ); return; } SQLBackgroundQueue queue = queues.get( connName ); queue.addHighPriority( new SQLResultEvent( query, identifier, bot )); interrupt(); } } /** * Runs a regular SQL query using the specified database connection. Your * bot's thread will not continue while the query is in effect. Use a * background query if you wish for the thread to continue while the query * is executed. * @param connectionName Name of the connection as defined in sql.cfg * @param query A properly-formed SQL query * @return The result set of the query (MAY be null) * @throws SQLException */ public ResultSet query( String connectionName, String query ) throws SQLException { if( !operational ){ Tools.printLog( "Unable to process query: " + query ); return null; } else { if( !pools.containsKey( connectionName )) { Tools.printLog( "Invalid connection name supplied: '" + connectionName + "'"); return null; } return pools.get( connectionName ).query( query ); } } /** * Creates a PreparedStatement. * Gets a Connection from the specified SQLConnectionPool (specified by the connectionName) * and creates a PreparedStatement object using the specified query. * Note that this sets the connection to "busy" in the SQLConnectionPool so it isn't used by other processes. * * You need to free it when the bot doesn't use the PreparedStatement anymore or this will be a Connection-leak !! * * @param connectionName Name of the connection as defined in sql.cfg * @param uniqueID A unique string that is used for re-using (busy) Connections in the connection pool. This is only used for PreparedStatements as their Connection is locked when a bot creates a PreparedStatement. * @param sqlstatement The (dynamic) SQL INSERT/UPDATE statement that will be pre-parsed for the PreparedStatement * @param retrieveAutoGeneratedKeys whether auto-generated keys should be returned * @return PreparedStatement object or null if there was an error */ public PreparedStatement createPreparedStatement(String connectionName, String uniqueID, String sqlstatement, boolean retrieveAutoGeneratedKeys) { if( !operational ) { Tools.printLog( "Unable to create PreparedStatement object; SQL System is not operational"); return null; } else { if(!pools.containsKey( connectionName )) return null; else { try { // Have we hit the maximum number of allowed connections in the pool? if(pools.get(connectionName).isAvailable() || pools.get(connectionName).totalConnections() < pools.get(connectionName).getMaxConnections()) { Connection conn = pools.get( connectionName ).getConnection(uniqueID); if(retrieveAutoGeneratedKeys) return conn.prepareStatement(sqlstatement, Statement.RETURN_GENERATED_KEYS); else return conn.prepareStatement(sqlstatement, Statement.NO_GENERATED_KEYS); } else { Tools.printLog("No more connections available in pool '"+connectionName+"' to create PreparedStatement!"); return null; } } catch(SQLException sqle) { Tools.printLog("SQLException encountered while trying to create a PreparedStatement from a Connection from '"+connectionName+"':"+sqle.getMessage()); return null; } } } } /** * Frees specified Connection for specified connectionpool using specified unique ID. * This should be used when closing a PreparedState\ment as it locks a connection on creation. * * @param connectionName Name of the connection as defined in sql.cfg * @param uniqueID The unique ID used to create the Prepared Statement * @param conn Connection used when creating a PreparedStatement */ public void freeConnection(String connectionName, String uniqueID, Connection conn) { if( !operational ) { Tools.printLog( "Unable to free Connection; SQL System is not operational"); } else { if(pools.containsKey( connectionName )) { pools.get( connectionName ).free(uniqueID, conn); } } } /** * @return True if the SQL system is operational */ public boolean isOperational(){ return operational; } /** * Prints to the log file the status of all connection pools. */ public void printStatusToLog(){ if( !operational ){ Tools.printLog( "SQL Connection Not Operational" ); } else { Tools.spamLog( getPoolStatus() ); } } /** * Gets status of all connection pools. * @return String array containing status of each individual connection pool. */ public String[] getPoolStatus() { String[] status = new String[pools.size()]; Iterator i = pools.values().iterator(); for(int j = 0; j i = queues.keySet().iterator(); while( i.hasNext() ){ String name = i.next(); SQLBackgroundQueue queue = queues.get( name ); SQLConnectionPool pool = pools.get( name ); while( !queue.isEmpty() && !pool.reachedMaxBackground() ){ SQLResultEvent event = queue.getNextInLine(); try { new SQLWorker( pool, event, this ); } catch (Exception e) { Tools.printLog("Uncaught exception encountered running background query."); Tools.printStackTrace(e); } } } // Perform stale check checkForStales = (nextStaleCheck < System.currentTimeMillis()); i = pools.keySet().iterator(); while( i.hasNext() ) { String name = i.next(); SQLConnectionPool pool = pools.get( name ); if( checkForStales ) pool.updateStaleConnections(); } if( checkForStales ) nextStaleCheck = System.currentTimeMillis() + STALE_TIME; try{ Thread.sleep( THREAD_SLEEP_TIME ); } catch( InterruptedException e ){} } } }

SQLWorker.java

 package twcore.core.sql; import java.sql.ResultSet; import java.sql.SQLException; import twcore.core.events.SQLResultEvent; import twcore.core.util.Tools; /** * Runs a background SQL query given a connection pool to use and an undelivered * SQLResultEvent object to place the results into. By handling in a separate * thread, it frees the bot process of having to wait on a query. */ public class SQLWorker implements Runnable { private SQLResultEvent m_event; // Event to hand the ResultSet to private SQLConnectionPool m_pool; // Connection pool to run query on private SQLManager m_manager; // For interrupting any waits /** * Creates a new SQLWorker and begins a background query, given a connection * pool to use for the query, an event to place the result set returned by * the query into, and an SQLManager to wake up/interrupt when the process * has finished (if it is currently sleeping). * @param pool Connection pool to use to run the query * @param event Event that will afterward contain the returned ResultSet * @param manager Waiting object to interrupt when finished */ public SQLWorker( SQLConnectionPool pool, SQLResultEvent event, SQLManager manager ) { m_pool = pool; m_manager = manager; Thread t = new Thread( this, "SQLWorker" ); m_event = event; m_pool.incrementBackgroundCount(); t.start(); } /** * Runs the SQL query found in the SQLResultEvent the SQLWorker was instantiated * with. Sets the returned ResultSet inside the event, which in turn will fire * the event in the bot so as to be handled and fetched by unique key. After * this is done, the background queue count of the connection pool used is * reduced by one, and the SQLManager that called the worker is interrupted * back into consciousness, if it is currently asleep. */ public void run() { try{ ResultSet set = m_pool.query( m_event.getQuery() ); m_event.setResultSet( set ); m_pool.decrementBackgroundCount(); m_manager.interrupt(); } catch( SQLException e ){ Tools.printLog("SQLException encountered while running background query in SQLWorker."); Tools.printStackTrace( e ); } } } 

有更多的文件,但我不能粘贴它们,因为我已经达到了这个问题的极限。 我只会用URL指导你

http://www.twcore.org/browser/trunk/twcore/src/twcore/core/sql