您的位置 首页 java

java实现Socket群聊

/**

* 实现群聊

* 服务端向每个客户端一对一通信,将一个客户端的消息发送给其他所有人,客户端与客户端之间没有直接联系

*/

public class GroupChatServer {

public static String msg;

//Sender线程和Receiver线程的缓冲区,Receiver线程收到某个客户端的消息,放入 msg ,唤醒Sender线程群发消息

public static void main(String[] args) {

try(ServerSocket ss = new Server socket (8888)) {

System.out.println(“群聊服务器已启动,等待客户端连接”);

while (true){

Socket socket = ss.accept();

System.out.println(“有新的客户端连接: “+socket.getInetAddress());

new Receiver2(socket).start();

new Sender2( Socket ).start();

}

} catch (IO Exception e) {

throw new RuntimeException(e);

}

}

}

class Sender2 extends Thread{

//向客户端一对一发送消息

final Socket socket;

public Sender2(Socket socket) {

this.socket = socket;

}

@Override

public void run() {

try(PrintWriter pw = new PrintWriter(socket.getOutputStream(),true)) {

while (true){

synchronized (“”){

//所有收发线程同步””

“”.wait();

//每次先等待,当有人说话了再唤醒,发送消息

}

if (socket.isClosed()||socket.isOutput Shutdown ())break;

//如果Receiver线程.readLine返回null即对方Socket对象已关闭,则关闭己方Socket对象,Sender线程检测到Socket关闭退出循环结束线程

pw.println(GroupChatServer.msg);

//服务器对每个客户端一对一发送,将这一个消息群发给所有客户端,因为只是读取msg而不会改变它所以不需要同步

}

} catch (IOException | InterruptedException e) {

throw new RuntimeException(e);

}finally {

if (socket != null) {

try {

socket.close();

//在Receiver线程也会finally socket.close() close方法可以多次执行,方法内判断如果isClosed直接return,不会产生异常

} catch (IOException e) {

throw new RuntimeException(e);

}

}

}

}

}

class Receiver2 extends Thread{

//与客户端一对一接收消息

final Socket socket;

public Receiver2(Socket socket) {

this.socket = socket;

}

@Override

public void run() {

try(BufferedReader br = new BufferedReader(new InputStream Reader(socket.getInputStream()))) {

synchronized (“”){

GroupChatServer.msg = “用户”+socket.getInetAddress()+”加入群聊”;

//每当有新用户加入时在群内提示

“”.notifyAll();

}

while (true){

String newMsg = br.readLine();

if (newMsg == null){

System.out.println(“用户”+socket.getInetAddress()+”结束通信”);

synchronized (“”){

GroupChatServer.msg = “用户”+socket.getInetAddress()+”退出群聊”;

“”.notifyAll();

}

break;

}

synchronized (“”){

//同步””每次只能一个Receiver线程存msg

GroupChatServer.msg = “用户”+socket.getInetAddress()+”说: “+newMsg;

“”.notifyAll();

//.notifyAll()唤醒所有””.wait()阻塞的Sender线程,每个Sender线程将msg发送给客户端后再次wait

}

}

} catch (IOException e) {

throw new RuntimeException(e);

}finally {

if (socket!=null){

try {

socket.close();

//Receiver线程先发现用户结束通信,唤醒Sender线程后break执行close,这时Sender还在就绪状态排队,所以Receiver线程会更早关闭socket

} catch (IOException e) {

throw new RuntimeException(e);

}

}

}

}

}

class GCS2{

//解决:当多个Receiver同时收到客户端的消息,在Sender唤醒但还没有得到CPU资源或部分 线程 没得到CPU资源发送消息时,下一个Receiver将msg更改,部分用户漏收消息的可能性

static String msg;

static Socket microphone;

//缓存发送msg的Socket

final static ArrayList<Socket> group = new ArrayList<>();

//组员容器

final static ArrayList<PrintWriter> pws = new ArrayList<>();

//输出流容器

public static void add(Socket socket){

group.add(socket);

try {

pws.add(new PrintWriter(socket.getOutputStream(),true));

} catch (IOException e) {

throw new RuntimeException(e);

}

}

public static void main(String[] args) {

try(ServerSocket ss = new ServerSocket(8888)) {

new Sender3().start();

//用一个Sender线程处理所有输出流,每次将msg发送给所有人之后再让Receiver传新的msg

while (true){

Socket socket = ss.accept();

add(socket);

new Receiver3(socket).start();

}

} catch (IOException e) {

throw new RuntimeException(e);

}

}

}

class Receiver3 extends Thread{

final Socket socket;

public Receiver3(Socket socket) {

this.socket = socket;

}

@Override

public void run() {

try(BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {

push(“用户”+socket.getInetAddress()+”:”+socket.getPort()+”加入群聊”);

//push()方法用于更新msg,单独写方法体便于复用

while (true){

String nMsg = br.readLine();

if (nMsg == null) {

push(“用户”+socket.getInetAddress()+”:”+socket.getPort()+”退出群聊”);

break;

}

push(“用户”+socket.getInetAddress()+”:”+socket.getPort()+”说: “+ nMsg);

//客户端发送给服务端的信息已经被缓存在Socket中了,每次. readLine ()是读取本地缓存中的内容,也就是说即便Receiver线程因为排队更新msg而阻塞,也不会漏掉客户端的某段msg

}

} catch (IOException | InterruptedException e) {

throw new RuntimeException(e);

} finally {

if (socket != null) {

try {

socket.close();

} catch (IOException e) {

throw new RuntimeException(e);

}

}

}

}

private void push(String msg) throws InterruptedException {

synchronized (“r”){

if (GCS2.msg!=null){

“r”.wait();

//当Sender没有取走msg时.wait

}

GCS2.msg = msg;

GCS2.microphone = socket;

}

synchronized (“s”){

“s”.notify();

//唤醒Sender线程,Receiver线程在”r”.wait()阻塞,Sender线程在”s”.wait()阻塞,如果是同一个对象锁的话使用notify()无法保证唤醒的是Sender

}

}

}

class Sender3 extends Thread{

@Override

public void run() {

while (true){

synchronized (“s”){

if (GCS2.msg==null) {

try {

“s”.wait();

} catch (InterruptedException e) {

throw new RuntimeException(e);

}

}

}

checkConnection();

//将已经关闭的Socket从group移除并关闭PW输出流

for (int i = 0;i<GCS2.group.size();i++){

if (GCS2.group.get(i)!=GCS2.microphone){

GCS2.pws.get(i).println(GCS2.msg);

}

}

synchronized (“r”){

GCS2.msg=null;

GCS2.microphone = null;

“r”.notify();

//唤醒一个因为”r”.wait()阻塞的Receiver线程更新信息

}

}

}

private void checkConnection(){

ArrayList<Integer> list = new ArrayList<>();

for (int i = GCS2.group.size()-1;i>=0;i–){

if (GCS2.group.get(i).isClosed()||GCS2.group.get(i).isOutputShutdown()){

list.add(i);

}

}

for (int i :

list) {

GCS2.group.remove(i);

GCS2.pws.get(i).close();

GCS2.pws.remove(i);

//从后往前删,每次删除不会影响下一个要删的元素的 索引

}

}

}

class GCS3{

final static String[] history = new String[99];

//使用数组缓存历史记录

static int cursor;

public static void push(String msg){

synchronized (history){

//存的时候串行

history[cursor++]=msg;

if (cursor==99){

cursor=0;

}

}

synchronized (“s”){

“s”.notifyAll();

}

}

public static void main(String[] args) {

try(ServerSocket ss = new ServerSocket(8888)) {

System.out.println(“服务器启动”);

while (true){

Socket socket = ss.accept();

new Receiver4(socket).start();

new Sender4(socket).start();

}

} catch (IOException e) {

throw new RuntimeException(e);

}

}

}

class Sender4 extends Thread{

final Socket socket;

int index;

public Sender4(Socket socket) {

this.socket = socket;

this.index = GCS3.cursor;

}

@Override

public void run() {

try(PrintWriter pw = new PrintWriter(socket.getOutputStream(),true)) {

while (true){

synchronized (“s”){

if (index==GCS3.cursor){

“s”.wait();

}

}

//取的时候并行

if (socket.isClosed()||socket.isOutputShutdown())break;

for (int c = GCS3.cursor;index!=c;index=(index+1)%99){

pw.println(GCS3.history[index]);

}

}

} catch (IOException | InterruptedException e) {

throw new RuntimeException(e);

}

}

}

class Receiver4 extends Thread{

final Socket socket;

public Receiver4(Socket socket) {

this.socket = socket;

}

@Override

public void run() {

try(BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {

GCS3.push(“用户”+socket.getInetAddress()+”:”+socket.getPort()+”加入群聊”);

while (true){

String msg = br.readLine();

if (msg == null) {

GCS3.push(“用户”+socket.getInetAddress()+”:”+socket.getPort()+”退出群聊”);

break;

}

GCS3.push(“用户”+socket.getInetAddress()+”:”+socket.getPort()+”说: “+msg);

}

} catch (IOException e) {

throw new RuntimeException(e);

}

try {

socket.close();

} catch (IOException e) {

throw new RuntimeException(e);

}

}

}

class GCS5 {

static int total;

//当前通信的连接总数

static int count;

//对一段msg完成发送的Sender数

static String msg = “加入群聊”;

static Socket microphone;

public static void totalUp(){

synchronized(“total”){

total++;

}

}

public static void totalDown(){

synchronized (“total”){

total–;

//修改同一数据时需要串行

}

}

public static void countUp(){

synchronized (“count”){

count++;

}

}

public static void push(String s,Socket socket){

if (GCS5.count<GCS5.total) {

synchronized (“r”) {

try {

“r”.wait();

//每次放入新的消息前先等待,等着前一句全部发送完毕后被唤醒

boolean b;

} catch (InterruptedException e) {

throw new RuntimeException(e);

}

}

}

synchronized (“count”){

count=0;

msg = s;

microphone = socket;

}

synchronized (“s”){

“s”.notifyAll();

}

}

public static void main(String[] args) {

try(ServerSocket ss = new ServerSocket(8888)) {

System.out.println(“已启动”);

while (true){

Socket socket = ss.accept();

System.out.println(“新的连接”);

new Receiver5(socket).start();

new Sender5(socket).start();

}

} catch (IOException e) {

throw new RuntimeException(e);

}

}

}

class Receiver5 extends Thread{

final Socket socket;

public Receiver5(Socket socket) {

this.socket = socket;

}

@Override

public void run() {

try(BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {

GCS5.push(“用户”+socket.getInetAddress()+”:”+socket.getPort()+”加入群聊”,socket);

while (true){

String msg = br.readLine();

if (msg == null) {

GCS5.push(“用户”+socket.getInetAddress()+”:”+socket.getPort()+”退出群聊”,socket);

break;

}

GCS5.push(“用户”+socket.getInetAddress()+”:”+socket.getPort()+”说: “+msg,socket);

}

} catch (IOException e) {

throw new RuntimeException(e);

}finally {

if (socket != null) {

try {

socket.close();

} catch (IOException e) {

throw new RuntimeException(e);

}

}

}

GCS5.totalDown();

}

}

class Sender5 extends Thread{

final Socket socket;

public Sender5(Socket socket) {

this.socket = socket;

}

@Override

public void run() {

try(PrintWriter pw = new PrintWriter(socket.getOutputStream(),true)) {

pw.println(“加入群聊”);

GCS5.totalUp();

//Sender线程运行后再total++,使创建到运行这期间count不会卡在total-1

while (!socket.isClosed()){

GCS5.countUp();

if (socket!=GCS5.microphone){

pw.println(GCS5.msg);

}

//先count++,如果Receiver线程正在更新msg,更新后count增加的1是新的msg的发送量

if (GCS5.count>=GCS5.total){

synchronized(“r”){

“r”.notify();

//当Sender都发送完之后唤醒一个.wait()的Receiver。但如果这时没有.wait()的Receiver,所有Sender全部.wait(),再收到新的msg时Receiver需要自己判断count>=total不执行.wait()

}

}

synchronized (“s”){

“s”.wait();

}

}

} catch (IOException | InterruptedException e) {

throw new RuntimeException(e);

}

}

}

文章来源:智云一二三科技

文章标题:java实现Socket群聊

文章地址:https://www.zhihuclub.com/174716.shtml

关于作者: 智云科技

热门文章

网站地图