您的位置 首页 php

开源史上最轻量级消息队列服务:httpsqs介绍和实践

HTTPSQS是国内著名软件工程师张宴的作品, 曾在金山有广泛应用, 常用来进行异步解耦操作,如发送手机短信、发送电子邮件等场景。

HTTPSQS(HTTP Simple Queue Service)是一款基于HTTP GET/POST协议的轻量级开源简单消息队列服务,使用Tokyo Cabinet的B+Tree Key/Value数据库来做数据的持久化存储。

HTTPSQS具有以下特征:

  • 非常简单,基于HTTP GET/POST 协议,支持HTTP协议的编程语言均可调用
  • 非常快速,入队列、出队列速度超过10000次/秒
  • 支持多队列
  • 单个队列支持的最大队列数量高达10亿条
  • 低内存消耗,海量数据存储,存储几十GB的数据只需不到100MB的物理内存缓冲区
  • 可以在不停止服务的情况下便捷地修改单个队列的最大队列数量
  • 可以实时查看队列状态(入队列位置、出队列位置、未读队列数量、最大队列数量)
  • 十分轻, 源代码不超过800行

安装

  • 安装 libevent

libvent官网:

# 下载安装包
wget 
# 解压
tar zxvf libevent-2.1.10-stable.tar.gz
cd libevent-2.1.10-stable
# 配置编译参数
./configure --prefix=/usr/local/libevent/
# 编译
make
# 安装
sudo make install
 
  • 安装tokyocabinet

tokyocabinet官网:

# 下载安装包
wget 
tar zxvf tokyocabinet-1.4.48.tar.gz
cd tokyocabinet
# 配置编译参数
./configure --prefix=/usr/local/tokyocabinet/
# 编译
make
# 安装
sudo make install
 
  • 安装httpsqs
# 解压安装包
tar zxvf httpsqs-1.7.tar.gz
# 修改libevent和tokyocabinet的安装路径
vim MakeFile 
# 编译
make
# 安装
sudo make install # 安装到了/usr/bin下
# 习惯将安装路径放在/usr/local目录下
sudo mkdir -p /usr/local/httpsqs/bin/
sudo mkdir -p /usr/local/httpsqs/tmp/
sudo mv /usr/bin/httpsqs /usr/local/httpsqs/bin/
 
  • 启动https: /usr/local/httpsqs/bin/httpsqs -d -p 端口号 -x 数据存放目录
  • 关闭httpsqs: 直接kill掉httpsqs进程

HTTPSQS管理脚本

个人比较习惯使用类似service start/stop的命令来管理各项服务,下面分享一个httpsqs管理脚本:

#!/bin/bash 
# 快速启动httpsqs脚本
# 
EXEC=/usr/local/httpsqs/bin/httsqs 
PORT=1218
PIDFILE=/usr/local/httpsqs/tmp/httpsqs.pid
DATADIR=/usr/local/httpsqs/data
case "$1" in 
 start) 
 if [ -f $PIDFILE ] 
 then 
  echo  "$PIDFILE exists, process is already running or crashed" 
 else
 echo "Starting httpsqs ..." 
 ulimit -SHn 65535 
 $EXEC -d -p $PORT -x $DATADIR -i $PIDFILE
 fi 
 ;; 
 stop) 
 if [ ! -f $PIDFILE ] 
 then 
 echo "$PIDFILE does not exist, process is not running" 
 else 
 PID=$(cat $PIDFILE) 
 kill $PID
  while  [ -x /proc/${PID} ] 
 do 
 echo "Waiting for httpsqs to shutdown ..." 
 sleep 1 
 done 
 echo "httpsqs stopped" 
 fi 
 ;; 
 *) 
 echo "Please use start or stop as first argument" 
 ;; 
esac
 

HTTPSQS的使用

  • 入列

入列: 将文本消息放入队列

以curl命令为例:

curl "经过URL编码的文本消息&auth=mypass123"
 

返回数据:

  • HTTPSQS_PUT_OK: 入列成功
  • HTTPSQS_PUT_ERROR: 入列失败
  • HTTPSQS_PUT_END: 队列已满
  • 出列

出列: 从队列中取出文本消息

以curl命令为例:

curl ""
 

返回消息队列的内容给客户端, 如果没有未取出的消息队列,则返回:HTTPSQS_GET_END

  • 查看队列状态

以curl命令为例:

curl ""
 

返回数据示例:

{"name":"xoyo","maxqueue":1000000,"putpos":45,"putlap":1,"getpos":6,"getlap":1,"unread":39}
 

HTTPSQS客户端

由于HTTPSQS是基于HTTP协议实现通信的,可以依据接口,自行编写library。本文分享基于PHP CI框架的HTTPSQS客户端类:

<?php if (!defined('BASEPATH')) exit('No direct script access allowed');
/*
----------------------------------------------------------------------------------------------------------------
HTTP Simple Queue Service - httpsqs client class for PHP v1.7.1
Author: Zhang Yan ( E-mail: net@s135.com
This is free software, and you are welcome to modify and redistribute it under the New BSD License
----------------------------------------------------------------------------------------------------------------
Useage:
<?php
include_once("httpsqs_client.php");
$httpsqs = new httpsqs($httpsqs_host, $httpsqs_port, $httpsqs_auth, $httpsqs_charset);
$result = $httpsqs->put($queue_name, $queue_data); //1. PUT text message into a queue. If PUT successful, return boolean: true. If an error occurs, return boolean: false. If queue full, return text: HTTPSQS_PUT_END
$result = $httpsqs->get($queue_name); //2. GET text message from a queue. Return the queue contents. If an error occurs, return boolean: false. If there is no unread queue message, return text: HTTPSQS_GET_END
$result = $httpsqs->gets($queue_name); //3. GET text message and pos from a queue. Return example: array("pos" => 7, "data" => "text message"). If an error occurs, return boolean: false. If there is no unread queue message, return: array("pos" => 0, "data" => "HTTPSQS_GET_END")
$result = $httpsqs->status($queue_name); //4. View queue status
$result = $httpsqs->status_json($queue_name); //5. View queue status in json. Return example: {"name":"queue_name","maxqueue":5000000,"putpos":130,"putlap":1,"getpos":120,"getlap":1,"unread":10}
$result = $httpsqs->view($queue_name, $queue_pos); //6. View the contents of the specified queue pos (id). Return the contents of the specified queue pos.
$result = $httpsqs->reset($queue_name); //7. Reset the queue. If reset successful, return boolean: true. If an error occurs, return boolean: false
$result = $httpsqs->maxqueue($queue_name, $num); //8. Change the maximum queue length of per-queue. If change the maximum queue length successful, return boolean: true. If it be cancelled, return boolean: false
$result = $httpsqs->synctime($num); //9. Change the interval to sync updated contents to the disk. If change the interval successful, return boolean: true. If it be cancelled, return boolean: false
?>
----------------------------------------------------------------------------------------------------------------
*/class Httpsqs
{
 public $httpsqs_host;
 public $httpsqs_port;
 public $httpsqs_auth;
 public $httpsqs_charset;
 protected $_ci;
 public function __construct($params=array()) {
 $this->_ci = & get_instance();
 
 $this->httpsqs_host = isset($params['host']) ? $params['host'] : $this->_ci->config->item('httpsqs_host');
 $this->httpsqs_port = isset($params['port']) ? $params['port'] : $this->_ci->config->item('httpsqs_port');
 $this->httpsqs_auth = isset($params['auth']) ? $params['auth'] : $this->_ci->config->item('httpsqs_auth');
 $this->httpsqs_charset = isset($params['charset']) ? $params['charset'] : $this->_ci->config->item('httpsqs_charset');
 return true;
 }
 public function http_get($query)
 {
 $ socket  = fsockopen($this->httpsqs_host, $this->httpsqs_port, $errno, $errstr, 5);
 if (!$socket)
 {
 return false;
 }
 $ header  = '';
 $pos_value = 0;
 $host = $this->httpsqs_host;
 $out = "GET ${query} HTTP/1.1\r\n";
 $out .= "Host: ${host}\r\n";
 $out .= "Connection:  close \r\n";
 $out .= "\r\n";
 fwrite($socket, $out);
 $line =  trim ( fgets ($socket));
 $header .= $line;
 list($proto, $rcode, $result) =  explode (" ", $line);
 $len = -1;
 while (($line = trim(fgets($socket))) != "")
 {
 $header .= $line;
 if (strstr($line, "Content-Length:"))
 {
 list($cl, $len) = explode(" ", $line);
 
 }
 if (strstr($line, "Pos:"))
 {
 list($pos_key, $pos_value) = explode(" ", $line);
 } 
 if (strstr($line, "Connection: close"))
 {
 $close = true;
 }
 }
 if ($len < 0)
 {
 return false;
 }
 
 $body = fread($socket, $len);
 $fread_times = 0;
 while(strlen($body) < $len){
 $body1 = fread($socket, $len);
 $body .= $body1;
 unset($body1);
 if ($fread_times > 100) {
 break;
 }
 $fread_times++;
 }
 //if ($close)  fclose ($socket);
 fclose($socket);
 $result_array["pos"] = (int)$pos_value;
 $result_array["data"] = $body;
 return $result_array;
 }
 public function http_post($query, $body)
 { 
 $socket = fsockopen($this->httpsqs_host, $this->httpsqs_port, $errno, $errstr, 1);
 if (!$socket)
 {
 return false;
 }
 $header = '';
 $host = $this->httpsqs_host; 
 $out = "POST ${query} HTTP/1.1\r\n";
 $out .= "Host: ${host}\r\n";
 $out .= "Content-Length: " . strlen($body) . "\r\n";
 $out .= "Connection: close\r\n";
 $out .= "\r\n";
 $out .= $body;
 fwrite($socket, $out);
 $line = trim(fgets($socket));
 $header .= $line;
 list($proto, $rcode, $result) = explode(" ", $line);
 $len = -1;
 while (($line = trim(fgets($socket))) != "")
 {
 $header .= $line;
 if (strstr($line, "Content-Length:"))
 {
 list($cl, $len) = explode(" ", $line);
 }
 if (strstr($line, "Pos:"))
 {
 list($pos_key, $pos_value) = explode(" ", $line);
 } 
 if (strstr($line, "Connection: close"))
 {
 $close = true;
 }
 }
 if ($len < 0)
 {
 return false;
 }
 $body = @fread($socket, $len);
 //if ($close) fclose($socket);
 fclose($socket);
 $result_array["pos"] = (int)$pos_value;
 $result_array["data"] = $body;
 return $result_array;
 }
 
 public function put($queue_name, $queue_data)
 {
 $result = $this->http_post("/?auth=".$this->httpsqs_auth."&charset=".$this->httpsqs_charset."&name=".$queue_name."&opt=put", $queue_data);
 if ($result["data"] == "HTTPSQS_PUT_OK") {
 return true;
 } else if ($result["data"] == "HTTPSQS_PUT_END") {
 return $result["data"];
 }
 return false;
 }
 
 public function get($queue_name)
 {
 $result = $this->http_get("/?auth=".$this->httpsqs_auth."&charset=".$this->httpsqs_charset."&name=".$queue_name."&opt=get");
 if ($result == false || $result["data"] == "HTTPSQS_ERROR" || $result["data"] == false) {
 return false;
 }
 return $result["data"];
 }
 
 public function gets($queue_name)
 {
 $result = $this->http_get("/?auth=".$this->httpsqs_auth."&charset=".$this->httpsqs_charset."&name=".$queue_name."&opt=get");
 if ($result == false || $result["data"] == "HTTPSQS_ERROR" || $result["data"] == false) {
 return false;
 }
 return $result;
 } 
 
 public function status($queue_name)
 {
 $result = $this->http_get("/?auth=".$this->httpsqs_auth."&charset=".$this->httpsqs_charset."&name=".$queue_name."&opt=status");
 if ($result == false || $result["data"] == "HTTPSQS_ERROR" || $result["data"] == false) {
 return false;
 }
 return $result["data"];
 }
 
 public function view($queue_name, $queue_pos)
 {
 $result = $this->http_get("/?auth=".$this->httpsqs_auth."&charset=".$this->httpsqs_charset."&name=".$queue_name."&opt=view&pos=".$pos);
 if ($result == false || $result["data"] == "HTTPSQS_ERROR" || $result["data"] == false) {
 return false;
 }
 return $result["data"];
 }
 
 public function reset($queue_name)
 {
 $result = $this->http_get("/?auth=".$this->httpsqs_auth."&charset=".$this->httpsqs_charset."&name=".$queue_name."&opt=reset");
 if ($result["data"] == "HTTPSQS_RESET_OK") {
 return true;
 }
 return false;
 }
 
 public function maxqueue($queue_name, $num)
 {
 $result = $this->http_get("/?auth=".$this->httpsqs_auth."&charset=".$this->httpsqs_charset."&name=".$queue_name."&opt=maxqueue&num=".$num);
 if ($result["data"] == "HTTPSQS_MAXQUEUE_OK") {
 return true;
 }
 return false;
 }
 
 public function status_json($queue_name)
 {
 $result = $this->http_get("/?auth=".$this->httpsqs_auth."&charset=".$this->httpsqs_charset."&name=".$queue_name."&opt=status_json");
 if ($result == false || $result["data"] == "HTTPSQS_ERROR" || $result["data"] == false) {
 return false;
 }
 return $result["data"];
 }
 public function synctime($num)
 {
 $result = $this->http_get("/?auth=".$this->httpsqs_auth."&charset=".$this->httpsqs_charset."&name=httpsqs_synctime&opt=synctime&num=".$num);
 if ($result["data"] == "HTTPSQS_SYNCTIME_OK") {
 return true;
 }
 return false;
 }
}
?>
 

其他HTTPSQS用法可参考:

文章来源:智云一二三科技

文章标题:开源史上最轻量级消息队列服务:httpsqs介绍和实践

文章地址:https://www.zhihuclub.com/151807.shtml

关于作者: 智云科技

热门文章

网站地图