高性能日志框架 Log4a 原理分析

  • 2018-01-17
  • 1,363
  • 3

本文主要分析 Android 端基于 mmap 的高性能日志框架 Log4a 的实现原理。

项目地址:https://github.com/pqpo/Log4a(求star)

前言

对于移动开发者来说,针对一些用户反馈难以复现的线上问题,分析日志有时候是解决问题的必要手段。 但是日志的收集一直有个痛点,就是性能与日志完整性无法兼得。 要实现高性能的日志收集,势必要使用大量内存,先将日志写入内存中,然后在合适的时机将内存里的日志写入到文件系统中(flush), 如果在 flush 之前用户强杀了进程,那么内存里的内容会因此而丢失。 日志实时写入文件可以保证日志的完整性,但是写文件是 IO 操作,涉及到用户态与内核态的切换,而且这种开销是开启线程都无法避免的,也就是说即使开启一个新线程实时写入也是相对耗时的。

关于读写文件涉及到用户态和内核态切换推荐阅读《Linux探秘之用户态与内核态》,总之要减少读写文件的次数,类比线程上下文的切换所带来的开销。那么,如何才能既减少读写文件次数,又防止断电造成内存数据丢失?Log4a 正是为了解决这个问题而诞生的,使用 mmap 文件映射内存作为缓存,可以在不牺牲性能的前提下最大化的保证日志的完整性。

日志首先会写入到 mmap 文件映射内存中,基于 mmap 的特性,即使用户强杀了进程,日志文件也不会丢失,并且会在下次初始化 Log4a 的时候回写到日志文件中。当然这并不是首创,微信开源的 mars 框架中的 xlog 模块也是基于 mmap 特性实现的,考虑到 xlog 模块功能比较多,使用比较复杂,同时也是出于学习的目的而写了 Log4a。

mmap 是什么

mmap是一种内存映射文件的方法,即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。实现这样的映射关系后,进程就可以采用指针的方式读写操作这一段内存,而系统会自动回写脏页面到对应的文件磁盘上,即完成了对文件的操作而不必再调用read,write等系统调用函数。相反,内核空间对这段区域的修改也直接反映用户空间,从而可以实现不同进程间的文件共享。(http://www.cnblogs.com/huxiao-tee/p/4660352.html)

mmap 内存映射文件之后,可以直接通过操作内存来读写文件,性能上接近直接读写内存。针对一次写文件,节省了用户态到内核态切换的开销,也减少了数据拷贝的次数。

Log4a 核心方法分析

对于 Android 端的日志框架,开源项目不少,大部分都是 Java 上层封装来实现更美观的输出,这并不是本文的重点,事实上,如何高效的写文件才是下面要讨论的。核心类就一个 me.pqpo.librarylog4a.LogBuffer  ,  可以接入到任意现有的日志框架中来实现高效的文件日志记录。LogBuffer 的代码不多就全部贴出了了:


public class LogBuffer {

    private static final String TAG = "LogBuffer";
    private long ptr = 0;
    private String logPath;
    private String bufferPath;
    private int bufferSize;

    public LogBuffer(String bufferPath, int capacity, String logPath) {
        this.bufferPath = bufferPath;
        this.bufferSize = capacity;
        this.logPath = logPath;
        try {
            ptr = initNative(bufferPath, capacity, logPath);
        }catch (Exception e) {
            Log.e(TAG, Log4a.getStackTraceString(e));
        }
    }
    public String getLogPath() {
        return logPath;
    }
    public String getBufferPath() {
        return bufferPath;
    }
    public int getBufferSize() {
        return bufferSize;
    }
    public void write(String log) {
        if (ptr != 0) {
            try {
                writeNative(ptr, log);
            }catch (Exception e) {
                Log.e(TAG, Log4a.getStackTraceString(e));
            }
        }
    }
    public void flushAsync() {
        if (ptr != 0) {
            try {
                flushAsyncNative(ptr);
            }catch (Exception e) {
                Log.e(TAG, Log4a.getStackTraceString(e));
            }
        }
    }
    public void release() {
        if (ptr != 0) {
            try {
                releaseNative(ptr);
            }catch (Exception e) {
                Log.e(TAG, Log4a.getStackTraceString(e));
            }
            ptr = 0;
        }
    }
    static {
        System.loadLibrary("log4a-lib");
    }
    private native static long initNative(String bufferPath, int capacity, String logPath);
    private native void writeNative(long ptr, String log);
    private native void flushAsyncNative(long ptr);
    private native void releaseNative(long ptr);
}

主要方法就3个,对应3个 native 方法:写文件(writeNative),异步刷新(flushAsyncNative)和释放资源(releaseNative),再加上一个在构造方法中调用的初始化方法(initNative)。

初始化

初始化方法 initNative 接受3个参数,分别是缓存文件的路径,缓存文件的大小,日志的路径,返回参数为 native 层的一个对象指针,用于调用其他的三个方法。


static jlong initNative(JNIEnv *env, jclass type, jstring buffer_path_,
           jint capacity, jstring log_path_) {
    const char *buffer_path = env->GetStringUTFChars(buffer_path_, 0);
    const char *log_path = env->GetStringUTFChars(log_path_, 0);
    const size_t buffer_size = static_cast(capacity);
    // 打开缓存文件
    int buffer_fd = open(buffer_path, O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
    // 打开日志文件
    int log_fd = open(log_path, O_RDWR|O_CREAT|O_APPEND, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
    // buffer 的第一个字节会用于存储日志路径名称长度,后面紧跟日志路径,之后才是日志信息
    if (strlen(log_path) > CHAR_MAX / 2) {
        jclass je = env->FindClass("java/lang/IllegalArgumentException");
        std::ostringstream oss;
        oss << "The length of log path must be less than " << CHAR_MAX / 2;
        env -> ThrowNew(je, oss.str().c_str());
        return 0;
    }
    // 初始化异步文件刷新
    if (fileFlush == nullptr) {
        fileFlush = new AsyncFileFlush(log_fd);
    }
    char *buffer_ptr = openMMap(buffer_fd, buffer_size);
    bool map_buffer = true;
    //如果打开 mmap 失败,则降级使用内存缓存
    if(buffer_ptr == nullptr) {
        buffer_ptr = new char[capacity];
        map_buffer = false;
    }
    env->ReleaseStringUTFChars(buffer_path_, buffer_path);
    env->ReleaseStringUTFChars(log_path_, log_path);
    LogBuffer* logBuffer = new LogBuffer(buffer_ptr, buffer_size);
    //将buffer内的数据清0, 并写入日志文件路径
    logBuffer->initData(log_path);
    logBuffer->map_buffer = map_buffer;
    return reinterpret_cast(logBuffer);
}

1. 首先分别打开缓存文件和日志文件,其中打开日志文件对了一个标志位 O_APPEND,该标志位表示以追加的方式写文件,并且系统级保证多线程写文件的安全性。另外 buffer 文件的储存结构为:第一个字节会用于存储日志路径名称长度,后面紧跟日志路径,之后才是日志信息。比如:0x47/storage/emulated/0/Android/data/me.pqpo.log4a/files/log/2018_01_17.txt
2. 然后初始化 AsyncFileFlush ,该类会以异步的方式将文件刷新到日志文件中。
3. 之后调用 openMMap 拿到缓存文件映射为内存后的地址 *buffer_ptr, 通过这个指针操作内存就相当于读写缓存文件了。
如果 mmap 开启失败则回退到使用普通内存缓存。
4. 接着初始化 native 层的 LogBuffer。
5. 最后将 LogBuffer 的指针返回给 Java 层。

下面是 openMMap 函数:


static char* openMMap(int buffer_fd, size_t buffer_size) {
    char* map_ptr = nullptr;
    if (buffer_fd != -1) {
        // 写脏数据
        writeDirtyLogToFile(buffer_fd);
        // 根据 buffer size 调整 buffer 文件大小
        ftruncate(buffer_fd, static_cast(buffer_size));
        lseek(buffer_fd, 0, SEEK_SET);
        map_ptr = (char *) mmap(0, buffer_size, PROT_WRITE | PROT_READ, MAP_SHARED, buffer_fd, 0);
        if (map_ptr == MAP_FAILED) {
            map_ptr = nullptr;
        }
    }
    return map_ptr;
}

1. 回写上次因断电(泛指,包括强杀进程)来不及写到日志文件中的脏数据
2. 根据 buffer size, 使用 ftruncate 调整 buffer 文件大小
3. 使用 mmap 创建文件内存映射

然后看回写脏数据的函数 writeDirtyLogToFile:


static void writeDirtyLogToFile(int buffer_fd) {
    struct stat fileInfo;
    if(fstat(buffer_fd, &fileInfo) >= 0) {
        size_t buffered_size = static_cast(fileInfo.st_size);
        if(buffered_size > 0) {
            char *buffer_ptr_tmp = (char *) mmap(0, buffered_size, PROT_WRITE | PROT_READ, MAP_SHARED, buffer_fd, 0);
            if (buffer_ptr_tmp != MAP_FAILED) {
                LogBuffer tmp(buffer_ptr_tmp, buffered_size);
                size_t data_size = tmp.dataSize();
                if (data_size > 0) {
                    char* log_path = tmp.getLogPath();
                    if (log_path != nullptr) {
                        int log_fd = open(log_path, O_RDWR|O_CREAT|O_APPEND, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
                        if(log_fd != -1) {
                            AsyncFileFlush tmpFlush(log_fd);
                            tmp.async_flush(&tmpFlush);
                        }
                        delete[] log_path;
                    }
                }
            }
        }
    }
}

1. 拿到原文件大小,如果原文件中有内容则开始写入
2. 使用 mmap 映射文件内存,初始化 LogBuffer,AsyncFileFlush
3. 异步回写脏数据 tmp.async_flush(&tmpFlush)
LogBuffer 与 AsyncFileFlush 的实现原理后面再说

写文件

写文件是 LogBuffer 和 AsyncFileFlush 协作完成的,协作过程如下:


static void writeNative(JNIEnv *env, jobject instance, jlong ptr,
            jstring log_) {
    const char *log = env->GetStringUTFChars(log_, 0);
    LogBuffer* logBuffer = reinterpret_cast(ptr);
    size_t log_size = strlen(log);
    // 缓存写不下时异步刷新
    if (log_size >= logBuffer->emptySize()) {
        logBuffer->async_flush(fileFlush);
    }
    logBuffer->append(log);
    env->ReleaseStringUTFChars(log_, log);
}

先会判断缓存够不够写入新日志,如果不够写入,会调用 async_flush 将原缓存异步写到日志文件中,这个方法会清空原有的缓存,最后将新的日志写入到缓存中中。
写看异步写文件 AsyncFileFlush 的实现:
AsyncFileFlush.h:


class AsyncFileFlush {
public:
    AsyncFileFlush(int log_fd);
    ~AsyncFileFlush();
    bool async_flush(char *data);
    void stopFlush();
private:
    void async_log_thread();
    ssize_t flush(char *data);
    bool exit = false;
    int log_fd;
    std::vector async_buffer;
    std::thread async_thread;
    std::condition_variable async_condition;
    std::mutex async_mtx;
};

实现其实就是一个生产消费模型,有一个线程读取数组内待写入的日志,如果有日志就写入,没有就等待。外部写入时唤醒写入线程写入。节选主要逻辑如下:


AsyncFileFlush::AsyncFileFlush(int log_fd):log_fd(log_fd) {
    async_thread = std::thread(&AsyncFileFlush::async_log_thread, this);
}

AsyncFileFlush::~AsyncFileFlush() {
    stopFlush();
}

void AsyncFileFlush::async_log_thread() {
    while (true) {
        std::unique_lock lck_async_log_thread(async_mtx);
        while (!async_buffer.empty()) {
            char* data = async_buffer.back();
            flush(data);
            async_buffer.pop_back();
            delete[] data;
        }
        if (exit) {
            return;
        }
        async_condition.wait(lck_async_log_thread);
    }
}

bool AsyncFileFlush::async_flush(char *data) {
    std::unique_lock lck_async_flush(async_mtx);
    if (exit) {
        return false;
    }
    async_buffer.push_back(data);
    async_condition.notify_all();
    return true;
}

在构造方法里启动了一个消费者线程,在析构函数中停止。在写文件线程中是一个循环,读取数组中的日志内容并调用 flush 写入日志文件中,最后等待。
async_flush 中会将日志内容放入数组中并通知写入线程开始工作。

最后是 native 层 LogBuffer 的实现:
LogBuffer.h


class LogBuffer {
public:
    LogBuffer(char* ptr, size_t capacity);
    ~LogBuffer();

    void initData(const char *log_path);
    char* dataCopy();
    size_t dataSize();
    size_t append(const char* log);
    void clear();
    void release();
    size_t emptySize();
    char *getLogPath();
    bool async_flush(AsyncFileFlush *fileFlush);

public:
    bool map_buffer = true;

private:
    char* const buffer_ptr = nullptr;
    char* data_ptr = nullptr;
    char* write_ptr = nullptr;

    size_t buffer_size = 0;
    std::recursive_mutex log_mtx;

};

需要注意的是最后的三个指针,buffer_ptr,data_ptr,write_ptr。
buffer_ptr 是缓存文件映射内存的指针,前面说过缓存文件的格式为:第一个字节会用于存储日志路径名称长度,后面紧跟日志路径,之后才是日志信息
data_ptr 指向的就是日志内容开始的地方。write_ptr 指向的是目前写内存的起始位置。知道这几个指针的作用之后就容易理解了。
下面按照头文件的函数一个个分析:


LogBuffer::LogBuffer(char *ptr, size_t buffer_size):
        buffer_ptr(ptr),
        buffer_size(buffer_size) {
    data_ptr = buffer_ptr + 1 + buffer_ptr[0];
    write_ptr = data_ptr + strlen(data_ptr);
}
LogBuffer::~LogBuffer() {
    release();
}
void LogBuffer::initData(const char *log_path) {
    std::lock_guard lck_release(log_mtx);
    memset(buffer_ptr, '\0', buffer_size);
    size_t log_path_len = strlen(log_path);
    buffer_ptr[0] = static_cast(log_path_len);
    memcpy(buffer_ptr + 1, log_path, log_path_len);
    data_ptr = buffer_ptr + 1 + log_path_len;
    write_ptr = data_ptr;
}
void LogBuffer::release() {
    std::lock_guard lck_release(log_mtx);
    if(map_buffer) {
        munmap(buffer_ptr, buffer_size);
    } else {
        delete[] buffer_ptr;
    }
}

构造函数中初始化了上面说的3个指针,其中 data_ptr = buffer_ptr + 1 + buffer_ptr[0]; 还是那句话:第一个字节会用于存储日志路径名称长度,后面紧跟日志路径,之后才是日志信息。稍微理解一下就明白了。initData 函数就是初始化 buffer 的数据格式。 write_ptr 为 data_ptr 加上原内容的长度实现之后写入追加的目的。
析构函数中释放资源,释放资源有两种情况,一种是使用 mmap 缓存的,一种是 mmap 开启失败使用普通内存缓存的。


size_t LogBuffer::dataSize() {
    return write_ptr - data_ptr;
}
size_t LogBuffer::emptySize() {
    return buffer_size - (write_ptr - buffer_ptr);
}
char *LogBuffer::dataCopy() {
    size_t str_len = dataSize() + 1;  //'\0'
    char* data = new char[str_len];
    memcpy(data, data_ptr, str_len);
    data[str_len - 1] = '\0';
    return data;
}
void LogBuffer::clear() {
    std::lock_guard lck_clear(log_mtx);
    write_ptr = data_ptr;
    memset(write_ptr, '\0', emptySize());
}
char *LogBuffer::getLogPath() {
    size_t path_len = static_cast(buffer_ptr[0]);
    char* file_path = nullptr;
    if(path_len > 0) {
        file_path = new char[path_len + 1];
        memcpy(file_path, buffer_ptr + 1, path_len);
        file_path[path_len] = '\0';
    }
    return file_path;
}

数据大小与空闲空间大小通过指针就能很快计算出来。数据拷贝通过 memcpy 操作指针来实现拷贝到新数组中,清空缓存使用 memset 将内存置0。使用 getLogPath 取得保存在缓存头的日志路径。所有操作都是针对内存地址实现的,方便且高效。

下面是新增日志信息:


size_t LogBuffer::append(const char *log) {
    std::lock_guard lck_append(log_mtx);
    size_t len = strlen(log);
    size_t freeSize = emptySize();
    size_t writeSize = len <= freeSize ? len : freeSize;
    memcpy(write_ptr, log, writeSize);
    write_ptr += writeSize;
    return writeSize;
}

计算新传入的日志长度,再计算缓存空闲空间,通过 memcpy 直接写入到 buffer 内存中,最后移动写指针 write_ptr 并返回写入长度。在日志缓存不够写入的时候,会造成写入的数据不完整,调用层需要自行判断写入长度与实际长度来重新写入未写入的数据。

当日志缓存写满之后,应该调用刷新缓存:


bool LogBuffer::async_flush(AsyncFileFlush *fileFlush) {
    std::lock_guard lck_clear(log_mtx);
    if (dataSize() > 0) {
        char *data = dataCopy();
        if(fileFlush->async_flush(data)) {
            clear();
            return true;
        } else {
            delete[] data;
            return false;
        }
    }
    return false;
}

刷新缓存会取出缓存的内容,再调用 AsyncFileFlush 异步写入到日志文件中,然后清理缓存空间供新日志写入。

有问题欢迎留言交流,如觉得不错请点个 star 以表支持。

>> 转载请注明来源:高性能日志框架 Log4a 原理分析

●非常感谢您的阅读,欢迎订阅微信公众号(右边扫一扫)以表达对我的认可与支持,我会在第一时间同步文章到公众号上。当然也可点击下方打赏按钮为我打赏。

免费分享,随意打赏

感谢打赏!
微信
支付宝

评论

发表评论