Commit 918be6d8 by Jeffrey Chiang

git init

parents
package com.jiangjl.concurrency;
import com.jiangjl.concurrency.j.simple.RunnableEmployee;
import com.jiangjl.concurrency.spec.Recordable;
import com.jiangjl.concurrency.stm.sharedmutable.STMWhiteBoard;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class Main {
private static ExecutorService executorService = Executors.newFixedThreadPool(10);
private static Logger logger = LoggerFactory.getLogger(Main.class);
public static void main(String[] args) throws InterruptedException {
Recordable whiteBoard = new STMWhiteBoard();
for (int i = 0; i < 20; i++) {
executorService.submit(new RunnableEmployee(whiteBoard));
}
executorService.shutdown();
if (executorService.awaitTermination(20, TimeUnit.SECONDS)) {
logger.info(whiteBoard.getResult());
}
System.exit(1);
}
}
package com.jiangjl.concurrency.actor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.jiangjl.concurrency.actor.isolatedmutable.actor.IMHostActor;
public class ActorMain {
public static void main(String[] args) {
final ActorSystem system = ActorSystem.create("system");
// final ActorRef hostActor = system.actorOf(Props.create(SMHostActor.class), "host-actor");
final ActorRef hostActor = system.actorOf(Props.create(IMHostActor.class), "host-actor");
// final ActorRef hostActor = system.actorOf(Props.create(IHostActor.class), "host-actor");
hostActor.tell(20, ActorRef.noSender());
system.terminate();
}
}
package com.jiangjl.concurrency.actor.immutable.actor;
import akka.actor.AbstractActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import com.jiangjl.concurrency.actor.immutable.domain.EmployeeWIthPreTotalYear;
import com.jiangjl.concurrency.spec.Employee;
import com.jiangjl.concurrency.utils.RandomUtils;
public class CalculateActor extends AbstractActor implements Employee {
private LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
@Override
public Receive createReceive() {
return receiveBuilder().match(EmployeeWIthPreTotalYear.class, employee -> {
getSender().tell(employee.getWorkYear() + employee.getPreTotalYear(), getSelf());
logger.info("{} add work year {} to pre total year {}", employee.getName(), employee.getWorkYear(), employee.getPreTotalYear());
}).build();
}
@Override
public String getName() {
return RandomUtils.randomName();
}
@Override
public Integer getWorkYear() {
return RandomUtils.randomYear();
}
}
package com.jiangjl.concurrency.actor.immutable.actor;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.jiangjl.concurrency.actor.immutable.domain.EmployeeWIthPreTotalYear;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import java.util.concurrent.TimeUnit;
public class IHostActor extends AbstractActor {
private final ActorRef calculateRef;
private LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
public IHostActor() {
this.calculateRef = getContext().actorOf(Props.create(CalculateActor.class), "employee-actor");
}
@Override
public Receive createReceive() {
return receiveBuilder().match(Integer.class, count -> {
logger.info("total employee count: {}", count);
Integer totalYear = 0;
for (int i = 0; i < count; i++) {
Future future = Patterns.ask(calculateRef, new EmployeeWIthPreTotalYear(totalYear), Timeout.apply(1, TimeUnit.SECONDS));
totalYear = (Integer) Await.result(future, Duration.apply(1, TimeUnit.SECONDS));
}
logger.info("count {}, result: {}", totalYear);
}).build();
}
}
package com.jiangjl.concurrency.actor.immutable.domain;
import com.jiangjl.concurrency.utils.RandomUtils;
public class EmployeeWIthPreTotalYear {
private final String name;
private final Integer workYear;
private final Integer preTotalYear;
public EmployeeWIthPreTotalYear(Integer preTotalYear) {
this.name = RandomUtils.randomName();
this.workYear = RandomUtils.randomYear();
this.preTotalYear = preTotalYear;
}
public String getName() {
return name;
}
public Integer getWorkYear() {
return workYear;
}
public Integer getPreTotalYear() {
return preTotalYear;
}
}
package com.jiangjl.concurrency.actor.isolatedmutable.actor;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.jiangjl.concurrency.actor.isolatedmutable.domain.SimpleEmployee;
import com.jiangjl.concurrency.actor.sharedmutable.domain.BriefNote;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class IMHostActor extends AbstractActor {
private final ActorRef reportorRef;
private LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
public IMHostActor() {
this.reportorRef = getContext().actorOf(Props.create(ReportWorkYearActor.class), "employee-actor");
}
@Override
public Receive createReceive() {
return receiveBuilder().match(Integer.class, count -> {
logger.info("total employee count: {}", count);
BriefNote briefNote = new BriefNote();
List<Future> futures = new ArrayList<>();
for (int i = 0; i < count; i++) {
futures.add(Patterns.ask(reportorRef, new SimpleEmployee(), Timeout.apply(1, TimeUnit.SECONDS)));
}
for (Future future : futures) {
int workYear = (Integer) Await.result(future, Duration.apply(1, TimeUnit.SECONDS));
briefNote.addYear("", workYear);
}
logger.info(briefNote.getResult());
}).build();
}
}
package com.jiangjl.concurrency.actor.isolatedmutable.actor;
import akka.actor.AbstractActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import com.jiangjl.concurrency.actor.isolatedmutable.domain.SimpleEmployee;
import com.jiangjl.concurrency.spec.Employee;
import com.jiangjl.concurrency.utils.RandomUtils;
public class ReportWorkYearActor extends AbstractActor implements Employee {
private LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
@Override
public Receive createReceive() {
return receiveBuilder().match(SimpleEmployee.class, employee -> {
getSender().tell(employee.getWorkYear(), getSelf());
logger.info("{} report work year {}", employee.getName(), employee.getWorkYear());
}).build();
}
@Override
public String getName() {
return RandomUtils.randomName();
}
@Override
public Integer getWorkYear() {
return RandomUtils.randomYear();
}
}
package com.jiangjl.concurrency.actor.isolatedmutable.domain;
import com.jiangjl.concurrency.utils.RandomUtils;
public class SimpleEmployee {
private final String name;
private final Integer workYear;
public SimpleEmployee() {
this.name = RandomUtils.randomName();
this.workYear = RandomUtils.randomYear();
}
public String getName() {
return name;
}
public Integer getWorkYear() {
return workYear;
}
}
package com.jiangjl.concurrency.actor.sharedmutable.actor;
import akka.actor.AbstractActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import com.jiangjl.concurrency.actor.sharedmutable.domain.EmployeeWithNote;
import com.jiangjl.concurrency.spec.Employee;
import com.jiangjl.concurrency.utils.RandomUtils;
public class EmployeeWithNoteActor extends AbstractActor implements Employee {
private LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
@Override
public Receive createReceive() {
return receiveBuilder().match(EmployeeWithNote.class, employee -> {
employee.getBriefNote().addYear(employee.getName(), employee.getWorkYear());
getSender().tell(employee.getBriefNote(), getSelf());
logger.info("add {} work year {}, and return briefNote", employee.getName(), employee.getWorkYear());
}).build();
}
@Override
public String getName() {
return RandomUtils.randomName();
}
@Override
public Integer getWorkYear() {
return RandomUtils.randomYear();
}
}
package com.jiangjl.concurrency.actor.sharedmutable.actor;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.jiangjl.concurrency.actor.sharedmutable.domain.BriefNote;
import com.jiangjl.concurrency.actor.sharedmutable.domain.EmployeeWithNote;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import java.util.concurrent.TimeUnit;
public class SMHostActor extends AbstractActor {
private final ActorRef employeeActor;
private LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
public SMHostActor() {
this.employeeActor = getContext().actorOf(Props.create(EmployeeWithNoteActor.class), "employee-actor");
}
@Override
public Receive createReceive() {
return receiveBuilder().match(Integer.class, count -> {
logger.info("total employee count: {}", count);
BriefNote briefNote = new BriefNote();
Future future;
for (int i = 0; i < count; i++) {
future = Patterns.ask(employeeActor, new EmployeeWithNote(briefNote), Timeout.apply(1, TimeUnit.SECONDS));
briefNote = (BriefNote) Await.result(future, Duration.apply(1, TimeUnit.SECONDS));
}
logger.info(briefNote.getResult());
}).build();
}
}
package com.jiangjl.concurrency.actor.sharedmutable.domain;
import com.jiangjl.concurrency.spec.Recordable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BriefNote implements Recordable {
private static Logger logger = LoggerFactory.getLogger(BriefNote.class);
private Integer count = 0;
private Integer totalYears = 0;
@Override
public void addYear(String name, Integer years) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
count = count + 1;
totalYears = totalYears + years;
logger.info("seq:{}, {} add {} years, total {}", count, name, years, totalYears);
}
@Override
public String getResult() {
final StringBuffer sb = new StringBuffer("BriefNote{");
sb.append("count=").append(count);
sb.append(", totalYears=").append(totalYears);
sb.append('}');
return sb.toString();
}
}
package com.jiangjl.concurrency.actor.sharedmutable.domain;
import com.jiangjl.concurrency.spec.Employee;
import com.jiangjl.concurrency.utils.RandomUtils;
public class EmployeeWithNote implements Employee {
private final String name;
private final Integer workYear;
private final BriefNote briefNote;
public EmployeeWithNote(BriefNote briefNote) {
this.name = RandomUtils.randomName();
this.workYear = RandomUtils.randomYear();
this.briefNote = briefNote;
}
@Override
public String getName() {
return name;
}
@Override
public Integer getWorkYear() {
return workYear;
}
public BriefNote getBriefNote() {
return briefNote;
}
}
package com.jiangjl.concurrency.j.isolatedmutable;
import com.jiangjl.concurrency.spec.Employee;
import com.jiangjl.concurrency.utils.RandomUtils;
import java.util.concurrent.Callable;
public class IMEmployee implements Employee, Callable<Employee> {
private String name;
private Integer workYear;
public IMEmployee() {
this.name = RandomUtils.randomName();
this.workYear = RandomUtils.randomYear();
}
@Override
public String getName() {
return name;
}
@Override
public Integer getWorkYear() {
return workYear;
}
@Override
public Employee call() throws Exception {
return this;
}
}
package com.jiangjl.concurrency.j.isolatedmutable;
import com.jiangjl.concurrency.j.simple.SimpleWhiteBoard;
import com.jiangjl.concurrency.spec.Employee;
import com.jiangjl.concurrency.spec.Recordable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class IsolatedMutableMain {
private static ExecutorService executorService = Executors.newFixedThreadPool(10);
private static Logger logger = LoggerFactory.getLogger(IsolatedMutableMain.class);
public static void main(String[] args) throws InterruptedException {
Recordable whiteBoard = new SimpleWhiteBoard();
List<Future> futureList = new ArrayList<>();
for (int i = 0; i < 200; i++) {
futureList.add(executorService.submit(new IMEmployee()));
}
futureList.forEach(f -> {
try {
Employee employee = (Employee) f.get(200, TimeUnit.MILLISECONDS);
whiteBoard.addYear(employee.getName(), employee.getWorkYear());
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
});
logger.info(whiteBoard.getResult());
System.exit(1);
}
}
package com.jiangjl.concurrency.j.sharedmutable.lock;
import com.jiangjl.concurrency.spec.Recordable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.locks.ReentrantLock;
public class WhiteBoardWithLock implements Recordable {
private static Logger logger = LoggerFactory.getLogger(WhiteBoardWithLock.class);
private final ReentrantLock lock = new ReentrantLock();
private Integer count = 0;
private Integer totalYears = 0;
@Override
public void addYear(String name, Integer years) {
lock.lock();
try {
Thread.sleep(50);
count = count + 1;
totalYears = totalYears + years;
logger.info("seq:{}, {} add {} years, total {}", count, name, years, totalYears);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
@Override
public String getResult() {
return toString();
}
@Override
public String toString() {
final StringBuffer sb = new StringBuffer("WhiteBoardWithLock{");
sb.append("count=").append(count);
sb.append(", totalYears=").append(totalYears);
sb.append('}');
return sb.toString();
}
}
package com.jiangjl.concurrency.j.sharedmutable.sync;
import com.jiangjl.concurrency.spec.Recordable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WhiteBoardWithSync implements Recordable {
private static Logger logger = LoggerFactory.getLogger(WhiteBoardWithSync.class);
private Integer count = 0;
private Integer totalYears = 0;
@Override
public synchronized void addYear(String name, Integer years) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
count = count + 1;
totalYears = totalYears + years;
logger.info("seq:{}, {} add {} years, total {}", count, name, years, totalYears);
}
@Override
public String getResult() {
return toString();
}
@Override
public String toString() {
final StringBuffer sb = new StringBuffer("WhiteBoardWithSync{");
sb.append("count=").append(count);
sb.append(", totalYears=").append(totalYears);
sb.append('}');
return sb.toString();
}
}
package com.jiangjl.concurrency.j.simple;
import com.jiangjl.concurrency.spec.Employee;
import com.jiangjl.concurrency.spec.Recordable;
import com.jiangjl.concurrency.utils.RandomUtils;
public class RunnableEmployee implements Employee, Runnable {
private String name;
private Integer workYear;
private Recordable whiteBoard;
public RunnableEmployee(Recordable whiteBoard) {
this.name = RandomUtils.randomName();
this.workYear = RandomUtils.randomYear();
this.whiteBoard = whiteBoard;
}
@Override
public String getName() {
return name;
}
@Override
public Integer getWorkYear() {
return workYear;
}
@Override
public void run() {
whiteBoard.addYear(name, workYear);
}
}
package com.jiangjl.concurrency.j.simple;
import com.jiangjl.concurrency.spec.Recordable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SimpleWhiteBoard implements Recordable {
private static Logger logger = LoggerFactory.getLogger(SimpleWhiteBoard.class);
private volatile Integer count = 0;
private volatile Integer totalYears = 0;
@Override
public void addYear(String name, Integer years) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
count = count + 1;
totalYears = totalYears + years;
logger.info("seq:{}, {} add {} years, total {}", count, name, years, totalYears);
}
@Override
public String getResult() {
final StringBuffer sb = new StringBuffer("SimpleWhiteBoard{");
sb.append("count=").append(count);
sb.append(", totalYears=").append(totalYears);
sb.append('}');
return sb.toString();
}
}
package com.jiangjl.concurrency.spec;
public interface Employee {
String getName();
Integer getWorkYear();
}
package com.jiangjl.concurrency.spec;
public interface Recordable {
void addYear(String name, Integer years);
String getResult();
}
package com.jiangjl.concurrency.stm.sharedmutable;
import com.jiangjl.concurrency.spec.Recordable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.stm.Ref;
import scala.concurrent.stm.japi.STM;
import static scala.concurrent.stm.japi.STM.afterCommit;
import static scala.concurrent.stm.japi.STM.afterRollback;
import static scala.concurrent.stm.japi.STM.atomic;
public class STMWhiteBoard implements Recordable {
private static Logger logger = LoggerFactory.getLogger(STMWhiteBoard.class);
private final Ref.View<Integer> countRef = STM.newRef(0);
private final Ref.View<Integer> totalYearRef = STM.newRef(0);
@Override
public void addYear(String name, Integer years) {
atomic(() -> {
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
Integer count = countRef.get() + 1;
Integer totalYears = totalYearRef.get() + years;
countRef.swap(count);
totalYearRef.swap(totalYears);
logger.info("seq:{}, {} add {} years, total {}", count, name, years, totalYears);
afterCommit(() -> logger.info("seq:{}, {} commit", count, name));
afterRollback(() -> logger.info("seq:{}, {} rollback", count, name));
}
);
}
@Override
public String getResult() {
final StringBuffer sb = new StringBuffer("SimpleWhiteBoard{");
sb.append("count=").append(countRef.get());
sb.append(", totalYears=").append(totalYearRef.get());
sb.append('}');
return sb.toString();
}
}
package com.jiangjl.concurrency.utils;
import java.util.UUID;
public class RandomUtils {
public static Integer randomYear() {
// Random random = new Random();
// return random.nextInt(10);
return 5;
}
public static String randomName() {
return UUID.randomUUID().toString().substring(0, 5);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment