用Java并行/multithreading创建现有代码

我有一个非常简单的爬虫。 我想让我当前的代码在几个线程中运行。 你能给我一些教程或文章来帮助我实现这个考试吗?

我原来是.Net开发人员,在.Net中我在multithreading中运行代码没有任何问题,但遗憾的是我对Java中的线程一无所知。

我的爬虫是一个命令行软件,所以不用担心GUI。

先谢谢你。

Java通过Thread类进行multithreading处理。 使现有代码multithreading的最常见方法之一是使用Runnable接口来定义在线程启动时要调用的内容,然后将其启动。

public class SomeFunctions { public static void FunctionA() {} public static void FunctionB() {} public static void FunctionC() {} } // ... Thread t1 = new Thread(new Runnable() { public void run() { SomeFunctions.FunctionA(); } }); t1.start(); // (rinse and repeat for the other functions) 

干编码,但它应该至少得到一般概念。 当然,只要你进入multithreading领域,你就会遇到并发问题,并且需要确保所有内容都经过适当的同步,等等,但任何语言都会遇到这些问题。

如果您担心同步,可以使用一些工具。 最简单的是Java内置的递归互斥锁function,即“synchronized”关键字。 通过java.util.concurrent和java.util.concurrent.locks包中的各种类也可以获得更经典的方法,例如Semaphore和ReadWriteLock

http://download.oracle.com/javase/6/docs/api/java/util/concurrent/package-summary.html http://download.oracle.com/javase/6/docs/api/java/util /concurrent/locks/package-summary.html

您可以查看我的webcrawler示例。 Sry的长度。

 import java.net.MalformedURLException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; /** * A web crawler with a Worker pool * * @author Adriaan */ public class WebCrawler implements Manager { private Set workers = new HashSet(); private List toCrawl = new ArrayList(); private Set crawled = new HashSet(); private Set hosts = new HashSet(); private Set results = new HashSet(); private int maxResults; public WebCrawler(String url, int numberOfWorkers, int maxResults) { this.maxResults = maxResults; toCrawl.add(url); createWorkers(numberOfWorkers); } public void createWorkers(int numberOfWorkers) { for (int i = 0; i < numberOfWorkers; i++) { workers.add(new Worker(this)); } } private void stopWorkers() { for (Worker worker : workers) { worker.terminate(); } } public synchronized Job getNewJob() { while (toCrawl.size() == 0) { try { wait(); } catch (InterruptedException e) { // ignore } } return new EmailAddressCrawlJob().setDescription(toCrawl.remove(0)); } public synchronized void jobCompleted(Job job) { // System.out.println("crawled: " + job.getDescription()); crawled.add(job.getDescription()); String host = getHost(job.getDescription()); boolean knownHost = hosts.contains(host); if (!knownHost) { System.out.println("host: " + host); hosts.add(host); } for (String url : job.getNewDescriptions()) { if (!crawled.contains(url)) { if (knownHost) { toCrawl.add(toCrawl.size() - 1, url); } else { toCrawl.add(url); } } } for (String result : job.getResults()) { if (results.add(result)) { System.out.println("result: " + result); } } notifyAll(); if (results.size() >= maxResults) { stopWorkers(); System.out.println("Crawled hosts:"); for (String crawledHost : hosts) { System.out.println(crawledHost); } Set uncrawledHosts = new HashSet(); for (String toCrawlUrl : toCrawl) { uncrawledHosts.add(getHost(toCrawlUrl)); } System.out.println("Uncrawled hosts:"); for (String unCrawledHost : uncrawledHosts) { System.out.println(unCrawledHost); } } if (crawled.size() % 10 == 0) { System.out.println("crawled=" + crawled.size() + " toCrawl=" + toCrawl.size() + " results=" + results.size() + " hosts=" + hosts.size() + " lastHost=" + host); } } public String getHost(String host) { int hostStart = host.indexOf("://") + 3; if (hostStart > 0) { int hostEnd = host.indexOf("/", hostStart); if (hostEnd < 0) { hostEnd = host.length(); } host = host.substring(hostStart, hostEnd); } return host; } public static void main(String[] args) throws MalformedURLException { new WebCrawler("http://www.nu.nl/", 5, 20); } } 

工人

 ** * A Worker proactively gets a Job, executes it and notifies its manager that * the Job is completed. * * @author Adriaan */ public class Worker extends Thread { private final Manager manager; private Job job = null; private boolean isWorking; public Worker(Manager manager) { this.manager = manager; isWorking = true; start(); } @Override public void run() { System.out.println("Worker " + Thread.currentThread().getId() + " starting "); while (isWorking) { job = manager.getNewJob(); job.execute(); manager.jobCompleted(job); } } public void terminate() { isWorking = false; } } 

经理界面

 /** * Manager interface for Workers * * @author Adriaan */ public interface Manager { /** * Gets a new job * * @return */ public Job getNewJob(); /** * Indicates the job is completed * * @param job */ public void jobCompleted(Job job); } 

工作

 import java.util.HashSet; import java.util.Set; /** * A Job is a unit of work defined by a String (the description). During execution the * job can obtain results and new job descriptions. * * @author Adriaan */ public abstract class Job { private String description; private Set results = new HashSet(); private Set newDescriptions = new HashSet(); /** * Sets the job description * * @param description * @return this for chaining */ public Job setDescription(String description) { this.description = description; return this; } /** * Executes the job */ public abstract void execute(); /** * Gets the results obtained * * @return */ public Set getResults() { return results; } /** * Gets the now job descriptions obtained * * @return */ public Set getNewDescriptions() { return newDescriptions; } /** * Gets the job description * * @return */ public String getDescription() { return description; } /** * Allows the implementation to add an obtained result * * @param result */ void addResult(String result) { results.add(result); } /** * Allows the implementation to add an obtained description * * @param result */ void addNewDescription(String newDescription) { newDescriptions.add(newDescription); } } 

抓取电子邮件地址页面的作业:

 import java.io.IOException; import java.io.InputStream; import java.net.MalformedURLException; import java.net.URL; import java.net.URLConnection; import java.util.StringTokenizer; import java.util.regex.Matcher; import java.util.regex.Pattern; /** * A Job which crawls HTTP or HTTPS URL's for email adresses, collecting new * URL's to crawl along the way. * * @author Adriaan */ public class EmailAddressCrawlJob extends Job { @Override public void execute() { try { URL url = new URL(getDescription()); if (url != null) { String text = readText(url); extractNewDescriptions(text, url); extractResults(text); } } catch (MalformedURLException e) { System.err.println("Bad url " + getDescription()); } } private String readText(URL url) { URLConnection connection; try { connection = url.openConnection(); InputStream input = connection.getInputStream(); byte[] buffer = new byte[1000]; int num = input.read(buffer); if (num > 0) { StringBuilder builder = new StringBuilder(); builder.append(new String(buffer, 0, num)); while (num != -1) { num = input.read(buffer); if (num != -1) { builder.append(new String(buffer, 0, num)); } } return builder.toString(); } } catch (IOException e) { //System.err.println("Could not read from " + url); } return ""; } private void extractNewDescriptions(String text, URL url) { // URL extracting code from Sun example String lowerCaseContent = text.toLowerCase(); int index = 0; while ((index = lowerCaseContent.indexOf("#"); String strLink = st.nextToken(); if (strLink.startsWith("javascript:")) { continue; } URL urlLink; try { urlLink = new URL(url, strLink); strLink = urlLink.toString(); } catch (MalformedURLException e) { // System.err.println("Could not create url: " + target // + " + " + strLink); continue; } // only look at http links String protocol = urlLink.getProtocol(); if (protocol.compareTo("http") != 0 && protocol.compareTo("https") != 0) { // System.err.println("Ignoring: " + protocol // + " protocol in " + urlLink); continue; } addNewDescription(urlLink.toString()); } } private void extractResults(String text) { Pattern p = Pattern .compile("([\\w\\-]([\\.\\w])+[\\w]+@([\\w\\-]+\\.)+[A-Za-z]{2,4})"); Matcher m = p.matcher(text); while (m.find()) { addResult(m.group(1)); } } } 

我知道这个答案有点冗长,但我认为OP可能最好用一个有效的例子来帮助我,而且我不久前就碰巧做了一个。

一个非常基本的java程序,它将提供multithreading的抽象概念。

 public class MyThread extends Thread { String word; public MyThread(String rm){ word = rm; } public void run(){ try { for(;;){ System.out.println(word); Thread.sleep(1000); } } catch(InterruptedException e) { System.out.println("sleep interrupted"); } } public static void main(String[] args) { Thread t1=new MyThread("First Thread"); Thread t2=new MyThread("Second Thread"); t1.start(); t2.start(); } } 

输出将是……

 First Thread Second Thread First Thread Second Thread First Thread 

使用此PPT它将帮助您掌握基础知识..

这里