

1. 背景



3、使用Disruptor构建有界邮箱队列(1:可以减少队列对象的GC;2:Disruptor出队监听采用等待序列栅栏信号方式实现,相对传统while true的自旋等待方式可节省1核CPU);


2. 主要设计点

在考虑上述的那七八楼之前,其实我首先考虑的是到底用Springboot去做httpServer还是使用Netty,其实在我的心里Springboot + openFeign这套东西根本不适用于高并发的ToC场景,无奈的是基于各方面的原因,最终我还是妥协的选择了用Springboot的传统套路,不过我摒弃了openFeign,原因是由于想使用本地缓存,因此需要一致性路由,然后就自然的想到了RPC中基本的服务注册、发现、路由,然后索性打算去实现一个猴版的Akka邮箱机制,然后就继续想到了后续那2,3,4,5,6,7,8。

2.1 简化版Actor模型


2.2 同节点RPC不走网络


2.3 使用Disruptor构建有界邮箱队列

在最开始的实现中,我还是按照常规套路:LinkedBlockingQueue + while true自旋的方式去不断take,实现如下:

 * MailBox
 * @author chenx
public abstract class MailBox {

    protected final LinkedBlockingQueue<RoutableMessage<?>> queue;
    protected boolean isStart = true;

    protected MailBox(LinkedBlockingQueue<RoutableMessage<?>> queue) {
        this.queue = queue;

     * start
    public void start() {
        new Thread(() -> {
            while (this.isStart) {
                try {
                    RoutableMessage<?> routableMsg = this.queue.take();
                } catch (InterruptedException ex) {
                    log.error("MailBox.process() error!", ex);
                } catch (Exception e) {
                    log.error("MailBox.process() error!", e);

     * stop
    public void stop() {
        try {
            this.isStart = false;
        } catch (Exception e) {
            log.error("MailBox.stop() error!", e);

     * onMessageReceived
     * @param routableMsg
    public abstract void onMessageReceived(RoutableMessage<?> routableMsg);

     * onStop
    public abstract void onStop();

     * put
     * @param routableMsg
    public void put(RoutableMessage<?> routableMsg) {
        try {
        } catch (InterruptedException ex) {
            log.error("MailBox.put() error!", ex);
        } catch (Exception ex) {
            log.error("MailBox.put() error!", ex);

1、Disruptor出队监听采用等待序列栅栏信号方式实现,相对传统while true的自旋等待方式可节省1核CPU
关于Disruptor,之前写过一篇介绍文章: 先进先出的高性能的有界内存队列Disruptor简介,有兴趣的同学可以去扒拉一下。

2.4 邮箱实现代码

2.4.1 MailBox

import cn.bossfriday.chatbot.common.ChatbotException;
import cn.bossfriday.chatbot.core.message.RoutableMessage;
import cn.bossfriday.chatbot.utils.ThreadPoolUtils;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import lombok.extern.slf4j.Slf4j;

 * MailBox
 * @author chenx
public abstract class MailBox {

    protected Disruptor<MessageEvent> queue;
    protected EventTranslatorOneArg<MessageEvent, RoutableMessage<Object>> eventTranslator;
    protected RingBuffer<MessageEvent> ringBuffer;

    protected MailBox(int capacity) {
        this.queue = new Disruptor<>(
                new MessageEventFactory(),
                ThreadPoolUtils.getThreadFactory("mailBoxQueueThread", null),
                new YieldingWaitStrategy());
        this.queue.handleEventsWithWorkerPool(new MessageEventHandler());
        this.eventTranslator = new MessageEventTranslator();

     * start
    public void start() {
        this.ringBuffer = this.queue.start();
        if (this.ringBuffer == null) {
            throw new ChatbotException("MailBox.start() error!");

     * stop
    public void stop() {
        try {
        } catch (Exception e) {
            log.error("MailBox.stop() error!", e);

     * onMessageReceived
     * @param routableMsg
    public abstract void onMessageReceived(RoutableMessage<?> routableMsg);

     * onStop
    public abstract void onStop();

     * put
     * @param routableMsg
    public void put(RoutableMessage<?> routableMsg) {
        try {
            this.ringBuffer.publishEvent(this.eventTranslator, (RoutableMessage<Object>) routableMsg);
        } catch (Exception ex) {
            log.error("MailBox.put() error!", ex);

     * getMailBoxBufferSize: Ensure that ringBufferSize must be a power of 2
    private static int getMailBoxBufferSize(int num) {
        int size = 2;
        while (size < num) {
            size <<= 1;

        return size < 1024 ? 1024 : size;

     * MessageEvent
    public class MessageEvent {

        private RoutableMessage<Object> message;

        public RoutableMessage<Object> getMessage() {
            return this.message;

        public void setMessage(RoutableMessage<Object> message) {
            this.message = message;

     * MessageEventFactory
    public class MessageEventFactory implements EventFactory<MessageEvent> {

        public MessageEvent newInstance() {
            return new MessageEvent();

     * MessageEventTranslator
    public class MessageEventTranslator implements EventTranslatorOneArg<MessageEvent, RoutableMessage<Object>> {

        public void translateTo(MessageEvent messageEvent, long l, RoutableMessage<Object> routableMessage) {

     * MessageEventHandler
    public class MessageEventHandler implements WorkHandler<MessageEvent> {

        public void onEvent(MessageEvent messageEvent) {

2.4.2 MessageInBox

import cn.bossfriday.chatbot.common.ChatbotException;
import cn.bossfriday.chatbot.core.MessageDispatcher;
import cn.bossfriday.chatbot.core.client.ChatGptClient;
import cn.bossfriday.chatbot.core.message.RoutableImMessage;
import cn.bossfriday.chatbot.core.message.RoutableMessage;
import cn.bossfriday.chatbot.entity.ChatbotConfig;
import lombok.extern.slf4j.Slf4j;

 * MessageInBox
 * @author chenx
public class MessageInBox extends MailBox {

    private MessageDispatcher messageDispatcher;
    private ChatGptClient chatGptClient;

    public MessageInBox(ChatbotConfig config, MessageDispatcher messageDispatcher) {

        this.chatGptClient = new ChatGptClient("ChatGptPoolingClient", config, messageDispatcher);
        this.messageDispatcher = messageDispatcher;

    public void onMessageReceived(RoutableMessage msg) {
        this.messageDispatcher.getChatGptInvokerExecutor(msg.getHashCode()).execute(() -> {
            if (msg instanceof RoutableImMessage) {
                this.chatGptClient.send((RoutableImMessage) msg);
            } else {
                throw new ChatbotException("MessageInBox received an unsupported message!");

    public void onStop() {
        try {
        } catch (Exception e) {
            log.error("MessageInBox stop() error!", e);

2.4.3 MessageSendBox

import cn.bossfriday.chatbot.common.ChatbotException;
import cn.bossfriday.chatbot.core.MessageDispatcher;
import cn.bossfriday.chatbot.core.client.RoutableMessageClient;
import cn.bossfriday.chatbot.core.message.RoutableMessage;
import cn.bossfriday.chatbot.entity.ChatbotConfig;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

 * MessageSendBox
 * @author chenx
public class MessageSendBox extends MailBox {

    private ChatbotConfig config;
    private MessageInBox inBox;
    private InetSocketAddress selfAddress;
    private ConcurrentHashMap<InetSocketAddress, RoutableMessageClient> clientMap = new ConcurrentHashMap<>();
    private MessageDispatcher messageDispatcher;

    public MessageSendBox(ChatbotConfig config, MessageInBox inBox, InetSocketAddress selfAddress, MessageDispatcher messageDispatcher) {

        this.config = config;
        this.inBox = inBox;
        this.selfAddress = selfAddress;
        this.messageDispatcher = messageDispatcher;

    public void onMessageReceived(RoutableMessage msg) {
        if (Objects.isNull(msg)) {
            throw new ChatbotException("The input routableMessage is null!");

        if (Objects.isNull(msg.getTargetServiceInstance())) {
            throw new ChatbotException("The input routableMessage.targetServiceInstance is null!");

        // local process need not network IO, enqueue directly. the localRouteNoNetwork config only just for testing.
        InetSocketAddress targetAddress = new InetSocketAddress(msg.getTargetServiceInstance().getHost(), msg.getTargetServiceInstance().getPort());
        if (this.selfAddress.equals(targetAddress) && this.config.isLocalRouteNoNetwork()) {

        // remote process
        if (!this.clientMap.containsKey(targetAddress)) {
            RoutableMessageClient client = new RoutableMessageClient(this.getPoolingClientName(targetAddress), this.config.getRestClientMaxTotal(), this.config.getRestClientMaxPerRoute());
            this.clientMap.putIfAbsent(targetAddress, client);

        this.messageDispatcher.getChatGptInvokerExecutor(msg.getHashCode()).execute(() -> this.clientMap.get(targetAddress).send(msg));

    public void onStop() {
        try {
            this.clientMap.forEach((key, value) -> value.shutdown());
            this.clientMap = new ConcurrentHashMap<>(8);
        } catch (Exception e) {
            log.error("MessageSendBox.stop() error!", e);

     * getName
     * @param targetAddress
     * @return
    private String getPoolingClientName(InetSocketAddress targetAddress) {
        return "RoutableMessagePoolingClient-" + targetAddress.toString();

2.5 使用Caffeine缓存用户最近会话



import com.beem.chat.robot.api.entity.ChatRobotConfig;
import com.beem.chat.robot.api.entity.im.ImMessage;
import com.beem.chat.robot.api.entity.request.OpenAiCompletionRequest;
import com.beem.chat.robot.api.exception.ChatbotException;
import com.beem.chat.robot.provider.common.ConcurrentCircularList;
import com.beem.chat.robot.provider.utils.ChatRobotUtils;
import com.beem.chat.robot.provider.utils.CircularListCodecUtils;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

import static com.beem.chat.robot.api.constant.ChatRobotConstant.SERVICE_AI_MODEL_TXT;
import static com.beem.chat.robot.api.constant.ChatRobotConstant.SERVICE_CHOICE_COUNT;

 * ChatContextCorrelator
 * @author chenx
public class ChatContextCorrelator {

    private Cache<String, byte[]> contextCache = null;
    private ChatRobotConfig config;

    public ChatContextCorrelator(ChatRobotConfig config) {
        this.config = config;
        this.contextCache = Caffeine.newBuilder()
                .expireAfterAccess(config.getContextCacheExpireSeconds(), TimeUnit.SECONDS)

     * getOpenAiCompletionRequest
     * @param message
     * @return
    public OpenAiCompletionRequest getOpenAiCompletionRequest(ImMessage message) throws IOException {
        String key = this.getKey(message);
        String value = ChatRobotUtils.getImMessageContent(message);
        ConcurrentCircularList<String> chatContextList = this.setContext(key, value);

        return this.buildOpenAiCompletionRequest(chatContextList);

     * @param message
     * @return
    public String getKey(ImMessage message) {
        if (Objects.isNull(message)) {
            throw new ChatbotException("The input ImMessage is null!");

        if (StringUtils.isEmpty(message.getBusChannel()) || StringUtils.isEmpty(message.getFromUserId())) {
            throw new ChatbotException("ImMessage.busChannel or fromUserId is empty!");

        return message.getBusChannel() + "-" + message.getFromUserId();

     * setContext
     * @param key
     * @param context
     * @return
    public ConcurrentCircularList<String> setContext(String key, String context) throws IOException {
        byte[] data = this.contextCache.getIfPresent(key);
        ConcurrentCircularList<String> circularList = ArrayUtils.isEmpty(data)
                ? new ConcurrentCircularList<>(this.config.getContextCacheRingCapacity())
                : CircularListCodecUtils.decodeStringList(data);
        this.contextCache.put(key, CircularListCodecUtils.encodeStringList(circularList, this.config.getContextCacheRingCapacity()));

        return circularList;

     * buildOpenAiCompletionRequest
     * @param chatContextList
     * @return
    private OpenAiCompletionRequest buildOpenAiCompletionRequest(ConcurrentCircularList<String> chatContextList) {
        if (chatContextList == null || chatContextList.isEmpty()) {
            throw new ChatbotException("chatContextList is null or empty!");

        StringBuilder sb = new StringBuilder();
        for (String content : chatContextList) {
            sb.append(content + " ");

        return OpenAiCompletionRequest.builder()

2.6 使用环形List的数据结构表达用户最近会话


import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

 * ConcurrentCircularList
 * @author chenx
public class ConcurrentCircularList<T> implements Iterable<T> {

    private Object[] elements;
    private int size;
    private int headIndex;
    private int tailIndex;
    private Lock lock = new ReentrantLock();

    public ConcurrentCircularList(int capacity) {
        if (capacity < 1) {
            throw new IllegalArgumentException("Capacity must be at least 1");

        this.elements = new Object[capacity];
        this.size = 0;
        this.headIndex = 0;
        this.tailIndex = 0;

     * add
     * @param element
    public void add(T element) {
        try {
            this.elements[this.tailIndex] = element;
            if (this.size == this.elements.length) {
                this.headIndex = (this.headIndex + 1) % this.elements.length;
            } else {

            this.tailIndex = (this.tailIndex + 1) % this.elements.length;
        } finally {

     * get
     * @param index
     * @return
    public T get(int index) {
        try {
            if (index < 0 || index >= this.size) {
                throw new IndexOutOfBoundsException("Index " + index + " is out of bounds");

            int i = (this.headIndex + index) % this.elements.length;
            return (T) this.elements[i];
        } finally {

     * size
     * @return
    public int size() {
        try {
            return this.size;
        } finally {

     * capacity
     * @return
    public int capacity() {
        return this.elements.length;

     * isEmpty
     * @return
    public boolean isEmpty() {
        try {
            return this.size == 0;
        } finally {

    public Iterator<T> iterator() {
        return new CircularListIterator();

    private class CircularListIterator implements Iterator<T> {

        private int current;
        private boolean removable;
        private int remaining;

        public CircularListIterator() {
            this.current = ConcurrentCircularList.this.headIndex;
            this.removable = false;
            this.remaining = ConcurrentCircularList.this.size;

        public boolean hasNext() {
            return this.remaining > 0;

        public T next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();

            T element = (T) ConcurrentCircularList.this.elements[this.current];
            this.removable = true;
            this.current = (this.current + 1) % ConcurrentCircularList.this.elements.length;

            return element;

        public void remove() {
            if (!this.removable) {
                throw new IllegalStateException();

            int deleteIndex = (this.current - 1 + ConcurrentCircularList.this.elements.length) % ConcurrentCircularList.this.elements.length;
            this.current = (this.current - 1 + ConcurrentCircularList.this.elements.length) % ConcurrentCircularList.this.elements.length;
            ConcurrentCircularList.this.elements[deleteIndex] = null;
            ConcurrentCircularList.this.headIndex = this.current;
            this.removable = false;

2.7 使用紧凑自定义方式序列化用户最近会话

不管用redis之类的分布式缓存中间件还是本地缓存去存储用户最近会话,都需要考虑尽量节省内存的问题。八股文中经常会看到什么一个空对象占用多少字节,对象头里有啥(对象hashCode、和GC相关的……),如何补码……,这个那个的,反正要想说的很全,真的需要提前准备。这里我采用ByteArrayInputStream和ByteArrayOutputStream对 CircularList< String >去做自定义的紧凑序列化,从下面的代码可以看出:除了String类型的会话内容外(当然utf8String自身的数据结构是前2字节存储字符串长度),只多用了2个字节(1个字节存储:circularListSize,1个字节存储circularListCapacity)。效率方面,我简单测试了一下:好像是1万次10条对话的环形list序列化+反序列化 80多毫秒左右吧(公司发的17年破笔记本运行)。

import cn.bossfriday.chatbot.common.ChatbotException;
import cn.bossfriday.chatbot.common.ConcurrentCircularList;
import lombok.experimental.UtilityClass;

import java.io.*;

import static cn.bossfriday.chatbot.common.ChatbotConstant.SERVICE_MAX_CONTEXT_CACHE_RING_SIZE;

 * CircularListCodecUtils
 * @author chenx
public class CircularListCodecUtils {

     * encodeStringList
     * @param circularList
     * @param circularListCapacity
     * @return
     * @throws IOException
    public static byte[] encodeStringList(ConcurrentCircularList<String> circularList, int circularListCapacity) throws IOException {
        if (circularList == null || circularList.isEmpty()) {
            throw new ChatbotException("The input chatContextList is null or empty!");

        try (ByteArrayOutputStream out = new ByteArrayOutputStream();
             DataOutputStream dos = new DataOutputStream(out)) {
            int circularListSize = circularList.size();
            if (circularListCapacity < 0 || circularListCapacity > SERVICE_MAX_CONTEXT_CACHE_RING_SIZE) {
                throw new IllegalArgumentException("The input chatContextList.size() must be between 0 and 256!");

            dos.writeByte((byte) circularListSize);
            dos.writeByte((byte) circularListCapacity);
            for (String item : circularList) {

            return out.toByteArray();

     * decodeStringList
     * @param bytes
     * @return
     * @throws IOException
    public static ConcurrentCircularList<String> decodeStringList(byte[] bytes) throws IOException {
        try (ByteArrayInputStream in = new ByteArrayInputStream(bytes);
             DataInputStream dis = new DataInputStream(in)) {

            int circularListSize = Byte.toUnsignedInt(dis.readByte());
            int circularListCapacity = Byte.toUnsignedInt(dis.readByte());
            if (circularListSize > circularListCapacity) {
                throw new IllegalArgumentException("circularListSize must <= circularListCapacity!");

            ConcurrentCircularList<String> circularList = new ConcurrentCircularList<>(circularListCapacity);
            for (int i = 0; i < circularListSize; i++) {

            return circularList;

2.8 使用Murmur64哈希算法实现一致性路由及线程一致性保障


import cn.bossfriday.chatbot.common.ChatbotException;
import org.apache.commons.lang3.StringUtils;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;

 * MurmurHashUtils
 * @author chenx
public class MurmurHashUtils {

    private static final int INT_BYTE_LENGTH = 4;
    private static final int LONG_BYTE_LENGTH = 8;

    private MurmurHashUtils() {


     * hash64
     * @param key
     * @return
    public static long hash64(String key) {
        if (StringUtils.isEmpty(key)) {
            throw new ChatbotException("input key is null or empty!");

        return hash64(key.getBytes(StandardCharsets.UTF_8));

     * hash64
     * @param key
     * @return
    public static long hash64(byte[] key) {
        return hash64A(key, 0x1234ABCD);

     * hash32
     * @param key
     * @return
    public static int hash32(String key) {
        if (StringUtils.isEmpty(key)) {
            throw new ChatbotException("input key is null or empty!");

        return hash(key.getBytes(StandardCharsets.UTF_8), 0x1234ABCD);

     * hash32
     * @param key
     * @return
    public static int hash32(byte[] key) {
        return hash(key, 0x1234ABCD);

     * Hashes bytes in an array.
     * @param data The bytes to hash.
     * @param seed The seed for the hash.
     * @return The 32 bit hash of the bytes in question.
    public static int hash(byte[] data, int seed) {
        return hash(ByteBuffer.wrap(data), seed);

     * Hashes bytes in part of an array.
     * @param data   The data to hash.
     * @param offset Where to start munging.
     * @param length How many bytes to process.
     * @param seed   The seed to start with.
     * @return The 32-bit hash of the data in question.
    public static int hash(byte[] data, int offset, int length, int seed) {
        return hash(ByteBuffer.wrap(data, offset, length), seed);

     * Hashes the bytes in a buffer from the current position to the limit.
     * @param buf  The bytes to hash.
     * @param seed The seed for the hash.
     * @return The 32 bit murmur hash of the bytes in the buffer.
    public static int hash(ByteBuffer buf, int seed) {
        // save byte order for later restoration
        ByteOrder byteOrder = buf.order();

        int m = 0x5bd1e995;
        int r = 24;

        int h = seed ^ buf.remaining();

        int k;
        while (buf.remaining() >= INT_BYTE_LENGTH) {
            k = buf.getInt();

            k *= m;
            k ^= k >>> r;
            k *= m;

            h *= m;
            h ^= k;

        if (buf.remaining() > 0) {
            ByteBuffer finish = ByteBuffer.allocate(INT_BYTE_LENGTH).order(ByteOrder.LITTLE_ENDIAN);
            h ^= finish.getInt();
            h *= m;

        h ^= h >>> 13;
        h *= m;
        h ^= h >>> 15;

        return h;

     * hash64A
     * @param data
     * @param seed
     * @return
    public static long hash64A(byte[] data, int seed) {
        return hash64A(ByteBuffer.wrap(data), seed);

     * hash64A
     * @param data
     * @param offset
     * @param length
     * @param seed
     * @return
    public static long hash64A(byte[] data, int offset, int length, int seed) {
        return hash64A(ByteBuffer.wrap(data, offset, length), seed);

     * hash64A
     * @param buf
     * @param seed
     * @return
    public static long hash64A(ByteBuffer buf, int seed) {
        ByteOrder byteOrder = buf.order();

        long m = 0xc6a4a7935bd1e995L;
        int r = 47;

        long h = seed ^ (buf.remaining() * m);

        long k;
        while (buf.remaining() >= LONG_BYTE_LENGTH) {
            k = buf.getLong();

            k *= m;
            k ^= k >>> r;
            k *= m;

            h ^= k;
            h *= m;

        if (buf.remaining() > 0) {
            ByteBuffer finish = ByteBuffer.allocate(LONG_BYTE_LENGTH).order(ByteOrder.LITTLE_ENDIAN);
            h ^= finish.getLong();
            h *= m;

        h ^= h >>> r;
        h *= m;
        h ^= h >>> r;


        return h;

2.9 确保与ChatGPT的交互一定是一问一答的串行方式


import cn.bossfriday.chatbot.entity.ChatbotConfig;
import cn.bossfriday.chatbot.utils.ThreadPoolUtils;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;

 * MessageDispatcher
 * @author chenx
public class MessageDispatcher {

    private int chatGptDispatcherThreadSize;
    private int imServerDispatcherThreadSize;

    private ExecutorService[] chatGptInvokerThreads = null;
    private ExecutorService[] imServerDispatcherThreads = null;

    public MessageDispatcher(ChatbotConfig config) {
        this.chatGptDispatcherThreadSize = config.getChatGptDispatcherThreadSize();
        this.imServerDispatcherThreadSize = config.getImServerDispatcherThreadSize();

        this.chatGptInvokerThreads = new ExecutorService[this.chatGptDispatcherThreadSize];
        this.imServerDispatcherThreads = new ExecutorService[this.imServerDispatcherThreadSize];

        for (int i = 0; i < this.chatGptDispatcherThreadSize; i++) {
            this.chatGptInvokerThreads[i] = ThreadPoolUtils.getSingleThreadExecutor("chatGptInvokerThread-" + i);

        for (int i = 0; i < this.imServerDispatcherThreadSize; i++) {
            this.imServerDispatcherThreads[i] = ThreadPoolUtils.getSingleThreadExecutor("imServerDispatcherThread-" + i);

        log.info("MessageDispatcher init done.");

     * getChatGptInvokerExecutor
     * @param hashCode
     * @return
    public ExecutorService getChatGptInvokerExecutor(Long hashCode) {
        return this.chatGptInvokerThreads[getHashIndex(hashCode, this.chatGptDispatcherThreadSize)];

     * getImServerInvokerExecutor
     * @param hashCode
     * @return
    public ExecutorService getImServerInvokerExecutor(Long hashCode) {
        return this.imServerDispatcherThreads[getHashIndex(hashCode, this.chatGptDispatcherThreadSize)];

     * getHashIndex
     * @param hashCode
     * @param threadSize
     * @return
    private static int getHashIndex(Long hashCode, int threadSize) {
        return Math.toIntExact(Math.abs(hashCode) % threadSize);

3. 总结


3.1 Mock挡板情况下运行效果


3.2 上游服务Mock挡板&真实接入ChatGPT运行





昔人已乘黄鹤去,此地空余黄鹤楼。 黄鹤一去不复返,白云千载空悠悠。 山远天高烟水寒,相思枫叶丹。 暮雁关山无故人,孤帆一片日边斜。






