首页云计算KVstore :键值映射存储服务器

KVstore :键值映射存储服务器

时间2024-07-26 09:02:05发布ongwu分类云计算浏览50

概述:本文介绍kv存储服务,所谓kv即key-value映射,用户存储键值对,提供:1.根据键查找值 2.根据键修改值 3.根据键删除值 效果:kv存储是运行在服务器上的一个进程,客户端通过套接字与服务器上的kvstore进程进行通信,客户端发送由协议规定的请求例如 SET name01 wjq ,kvstore服务器接收到请求并解析,回复结果 SUCCESS; 又例如客户端发送 GET name01 ,接收到服务端的回复 wjq

实现思路:

1.首先我们需要做到kvstore与客户端通信,这里使用tcp,也就是说设计之初kvstore就是一个支持百万级并发连接的tcp服务器:这里使用一个reactor模型,直接附上代码,tcp服务器不在本文讲解范围内

#include <sys/socket.h> #include <errno.h> #include <netinet/in.h> #include <stdio.h> #include <string.h> #include <unistd.h> #include <pthread.h> #include <sys/poll.h> #include <sys/epoll.h> #include <sys/time.h> #include "kvstore.h" // listenfd // EPOLLIN --> int accept_cb(int fd); // clientfd // int recv_cb(int fd); int send_cb(int fd); // conn, fd, buffer, callback int epfd = 0; struct conn_item connlist[1048576] = {0}; // 1024 2G 2 * 512 * 1024 * 1024 // list struct timeval zvoice_king; // // 1000000 #define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000) int set_event(int fd, int event, int flag) { if (flag) { // 1 add, 0 mod struct epoll_event ev; ev.events = event ; ev.data.fd = fd; epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev); } else { struct epoll_event ev; ev.events = event; ev.data.fd = fd; epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev); } } int accept_cb(int fd) { struct sockaddr_in clientaddr; socklen_t len = sizeof(clientaddr); int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len); if (clientfd < 0) { return -1; } set_event(clientfd, EPOLLIN, 1); connlist[clientfd].fd = clientfd; memset(connlist[clientfd].rbuffer, 0, BUFFER_LENGTH); connlist[clientfd].rlen = 0; memset(connlist[clientfd].wbuffer, 0, BUFFER_LENGTH); connlist[clientfd].wlen = 0; connlist[clientfd].recv_t.recv_callback = recv_cb; connlist[clientfd].send_callback = send_cb; if ((clientfd % 1000) == 999) { struct timeval tv_cur; gettimeofday(&tv_cur, NULL); int time_used = TIME_SUB_MS(tv_cur, zvoice_king); memcpy(&zvoice_king, &tv_cur, sizeof(struct timeval)); printf("clientfd : %d, time_used: %d\n", clientfd, time_used); } return clientfd; } int recv_cb(int fd) { // fd --> EPOLLIN char *buffer = connlist[fd].rbuffer; int idx = connlist[fd].rlen; int count = recv(fd, buffer, BUFFER_LENGTH, 0); if (count == 0) { printf("disconnect\n"); epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); close(fd); return -1; } connlist[fd].rlen = count; kvstore_request(&connlist[fd]); connlist[fd].wlen = strlen(connlist[fd].wbuffer); set_event(fd, EPOLLOUT, 0); return count; } int send_cb(int fd) { char *buffer = connlist[fd].wbuffer; int idx = connlist[fd].wlen; int count = send(fd, buffer, idx, 0); set_event(fd, EPOLLIN, 0); return count; } int init_server(unsigned short port) { int sockfd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in serveraddr; memset(&serveraddr, 0, sizeof(struct sockaddr_in)); serveraddr.sin_family = AF_INET; serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); serveraddr.sin_port = htons(port); if (-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr))) { perror("bind"); return -1; } listen(sockfd, 10); return sockfd; } int epoll_entry(void) { int port_count = 20; unsigned short port = 2048; int i = 0; epfd = epoll_create(1); // int size for (i = 0;i < port_count;i ++) { int sockfd = init_server(port + i); // 2048, 2049, 2050, 2051 ... 2057 connlist[sockfd].fd = sockfd; connlist[sockfd].recv_t.accept_callback = accept_cb; set_event(sockfd, EPOLLIN, 1); } gettimeofday(&zvoice_king, NULL); struct epoll_event events[1024] = {0}; while (1) { // mainloop(); int nready = epoll_wait(epfd, events, 1024, -1); // int i = 0; for (i = 0;i < nready;i ++) { int connfd = events[i].data.fd; if (events[i].events & EPOLLIN) { // int count = connlist[connfd].recv_t.recv_callback(connfd); //printf("recv count: %d <-- buffer: %s\n", count, connlist[connfd].rbuffer); } else if (events[i].events & EPOLLOUT) { // printf("send --> buffer: %s\n", connlist[connfd].wbuffer); int count = connlist[connfd].send_callback(connfd); } } } //getchar(); //close(clientfd); }
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210 函数epoll_entry实现了与客户端之间的通信,并通过kvstore_request(&connlist[fd])这个函数实现了处理客户端请求,并将处理结果发送给客户端

2.kvstore存储引擎的实现

概要:由于服务器要将客户端请求存储的内容存储起来,有两种方式,一是存储数据库,二是存储服务端本地

为了简单实现业务,本文使用存储到本地进行讲解,采用的数据结构是哈希表

先介绍哈希表的实现以及为kvstore封装的接口

/* * 单线程版本,没有做线程安全! * */ #include <stdio.h> #include <string.h> #include <stdlib.h> #include <pthread.h> #include "kvstore.h" #define MAX_KEY_LEN 128 #define MAX_VALUE_LEN 512 #define MAX_TABLE_SIZE 102400 #define ENABLE_POINTER_KEY 1 typedef struct hashnode_s { // hash node #if ENABLE_POINTER_KEY char *key; char *value; #else char key[MAX_KEY_LEN]; char value[MAX_VALUE_LEN]; #endif struct hashnode_s *next; } hashnode_t; typedef struct hashtable_s { // hash table hashnode_t **nodes; // hashnode_t * 类型的 *nodes,也就是存放着hashnode_t类型的指针的数组nodes int max_slots; int count; } hashtable_t; hashtable_t Hash; static int _hash(char *key, int size) { // hash函数,使用key确定hash值 if (!key) return -1; int sum = 0; int i = 0; while (key[i] != 0) { // 使用ASCII计算hash值,由于key是字符数组,该方法通用 sum += key[i]; i ++; } return sum % size; // 返回hash值 } hashnode_t *_create_node(char *key, char *value) { hashnode_t *node = (hashnode_t *)kvstore_malloc(sizeof(hashnode_t)); if (!node) return NULL; // malloc filed #if ENABLE_POINTER_KEY // 为节点的成员分配空间 node->key = kvstore_malloc(strlen(key) + 1); if (!node->key) { kvstore_free(node); // node分配成功但key失败 return NULL; } strcpy(node->key, key); node->value = kvstore_malloc(strlen(value) + 1); if (!node->value) { kvstore_free(node->key); // node和key分配成功但value失败 kvstore_free(node); return NULL; } strcpy(node->value, value); #else strncpy(node->key, key, MAX_KEY_LEN); strncpy(node->value, value, MAX_VALUE_LEN); #endif // 初始化 next node->next = NULL; return node; } int init_hashtable(hashtable_t *hash) { if (!hash) return -1; hash->nodes = (hashnode_t**)kvstore_malloc(sizeof(hashnode_t *) * MAX_TABLE_SIZE); if (!hash->nodes) return -1; hash->max_slots = MAX_TABLE_SIZE; hash->count = 0; return 0; } void dest_hashtable(hashtable_t *hash) { // 销毁哈希表 if (!hash) return; // 遍历释放数组中所有链表 int i = 0; for (i = 0; i < hash->max_slots; i++) { hashnode_t *node = hash->nodes[i]; while (node != NULL) { hashnode_t *tmp = node; // 保存当前节点 node = node->next; // 移动到下一个节点 hash->nodes[i] = node; // 更新头指针,在这段代码中没有作用 kvstore_free(tmp); // 释放当前节点 } } kvstore_free(hash->nodes); // 释放哈希表的数组成员 } int put_kv_hashtable(hashtable_t *hash, char *key, char *value) { if (!hash || !key || !value) return -1; int idx = _hash(key, MAX_TABLE_SIZE); // 哈希值作为数组下标 hashnode_t *node = hash->nodes[idx]; // 获取正确数组位置的头指针 #if 1 while (node != NULL) { // 如果已经存在,直接退出,不重复插入 if (strcmp(node->key, key) == 0) { return 1; } node = node->next; } #endif hashnode_t *new_node = _create_node(key, value); // 头插法 new_node->next = hash->nodes[idx]; hash->nodes[idx] = new_node; // 更新头节点指针 hash->count ++; return 0; } char *get_kv_hashtable(hashtable_t *hash, char *key) { // search if (!hash || !key) return NULL; int idx = _hash(key, MAX_TABLE_SIZE); hashnode_t *node = hash->nodes[idx]; // 确定数组索引 while (node != NULL) { // 遍历查找 if (strcmp(node->key, key) == 0) { return node->value; } node = node->next; } return NULL; } int count_kv_hashtable(hashtable_t *hash) { return hash->count; } int delete_kv_hashtable(hashtable_t *hash, char *key) { // 根据key删除节点 if (!hash || !key) return -1; int idx = _hash(key, MAX_TABLE_SIZE); // 哈希值作为索引 // 先判断头指针 hashnode_t *head = hash->nodes[idx]; if (head == NULL) return -1; // 遍历链表 hashnode_t *cur = hash->nodes; hashnode_t *prev = NULL; while (cur != NULL) { if (strcmp(cur->key, key) == 0) break; prev = cur; cur = cur->next; } if (cur == NULL) return -1; // 没找到 if (prev == NULL) { // 如果要删除的是头节点 hash->nodes[idx] = cur->next; // 删除cur } else { // 不是头节点 prev->next = cur->next; // 删除cur } // 释放cur节点的空间 #if ENABLE_POINTER_KEY if (cur->key) { kvstore_free(cur->key); } if (cur->value) { kvstore_free(cur->value); } kvstore_free(cur); #else free(cur); #endif hash->count --; // 更新count return 0; } int exit_kv_hashtable(hashtable_t *hash, char *key) { // 判断是否存在该key的映射value char *value = get_kv_hashtable(hash, key); if (value) return 1; else return 0; } int kvs_hash_modify(hashtable_t *hash, char *key, char *value) { // 先查找key再修改value if (!hash || !key || !value) return -1; int idx = _hash(key, MAX_TABLE_SIZE); hashnode_t *node = hash->nodes[idx]; while (node != NULL) { if (strcmp(node->key, key) == 0) { // 先释放原空间,避免内存泄漏 kvstore_free(node->value); // 释放原value指向的空间 node->value = NULL; // 避免使用悬空指针 // 新分配空间 node->value = kvstore_malloc(strlen(value) + 1); if (node->value) { // 分配成功 strcpy(node->value, value); return 0; } else assert(0); } node = node->next; } return -1; } int kvs_hash_count(hashtable_t *hash) { return hash->count; } // 再封装一层接口:使用第三方库时,对库函数进行一层封装,适配自己的代码 // 排查问题更新迭代时只需要修改这一层接口的内容就行,不需要在源代码主体上修改,相当于做了一层隔离 int kvstore_hash_craete(hashtable_t *hash) { return init_hashtable(hash); } void kvstore_hash_destory(hashtable_t *hash) { return dest_hashtable(hash); } int kvs_hash_set(hashtable_t *hash, char *key, char *value) { return put_kv_hashtable(hash, key, value); } char *kvs_hash_get(hashtable_t *hash, char *key) { return get_kv_hashtable(hash, key); } int kvs_hash_delete(hashtable_t *hash, char *key) { return delete_kv_hashtable(hash, key); }
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318

对于哈希表的设计与实现,注释说的很清楚了,最后封装的接口是用在接下来的kvstore主程序中的

3.kvstore主体

概要:这份代码集成了前面的tcp服务epoll_entry、存储组件哈希表以及最后要介绍的:对客户端请求进行解析处理的组件

先介绍kvstore主程序

int init_kvengine(void) { kvstore_hash_create(&Hash); } int exit_kvengine(void) { kvstore_hash_destory(&Hash); } int main() { init_kvengine(); // 创建存储引擎,这里是哈希表 epoll_entry(); // 启动tcp服务器,处理并回复客户端请求 exit_kvengine(); // 销毁哈希表 }
1234567891011121314151617181920212223242526

而这里调用的init_kvengine();实际上就是前面的哈希表代码中的:

int init_hashtable(hashtable_t *hash) { if (!hash) return -1; hash->nodes = (hashnode_t**)kvstore_malloc(sizeof(hashnode_t *) * MAX_TABLE_SIZE); if (!hash->nodes) return -1; hash->max_slots = MAX_TABLE_SIZE; hash->count = 0; return 0; } 123456789101112

4.请求解析

我们对于kvstore主程序中的存储引擎、tcp服务都介绍完了,接下来介绍最核心的请求解析函数:

这两个函数位于epoll_entry的kvstore_request(&connlist[fd])函数中:

int kvstore_request(struct conn_item *item) { char *msg = item->rbuffer; char *tokens[KVSTORE_MAX_TOKENS]; int count = kvstore_split_token(msg, tokens); // 解析请求 kvstore_parser_protocol(item, tokens, count); // 生成回复内容 return 0; } 1234567891011

这个函数做到了对用户请求的解析以及回复,而依赖的是以下两个函数:

解析请求: int kvstore_split_token(char *msg, char **tokens) { // 将msg字符串进行分割,结果保存在tokens字符串数字里 if (msg == NULL || tokens == NULL) return -1; // 参数检查 int idx = 0; char *token = strtok(msg, " "); // 对msg按空格“ ”进行分割,返回第一个子字符串 while (token != NULL) { // 获取剩余的子字符串 tokens[idx++] = token; // 将子字符串保存在字符串数组里 token = strtok(NULL, " "); // 固定写法,依次获取除第一个外,剩余的子字符串 } return idx; // 返回子字符串的个数 }
12345678910111213141516

我们能对用户请求按空格进行分割的原因是,kvstore规定了应用协议,只有按协议规定发送的请求才能被正确处理,就像linux shell 中的命令的名称以及使用方法一样

处理并回复: int kvstore_parser_protocol(struct conn_item *item, char **tokens, int count) { if (item == NULL || tokens[0] == NULL || count == 0) return -1; // 检查参数 char *msg = item->wbuffer; // 获取写缓冲区 memset(msg, 0, BUFFER_LENGTH); // 对用户的命令的解析结果, 例如 SET name wjq 解析结果如下: char *command = tokens[0]; // SET char *key = tokens[1]; // name char *value = tokens[2]; // wjq int cmd = KVS_CMD_START; for (cmd = KVS_CMD_START; cmd < KVS_CMD_SIZE; cmd++) { // 查找比对tokens里的命令 if (strcmp(commands[cmd], command) == 0) { break; // 找到了或者不存在 } } // 匹配命令并回复结果 switch (cmd) { case KVS_CMD_HSET: { // SET :添加 int res = kvstore_hash_set(key, value); // 调用哈希表的函数 if (!res) { snprintf(msg, BUFFER_LENGTH, "SUCCESS"); } else { snprintf(msg, BUFFER_LENGTH, "FAILED"); } break; } case KVS_CMD_HGET: { // GET :查询 char *val = kvstore_hash_get(key); // 调用哈希表提供的接口 if (val) { snprintf(msg, BUFFER_LENGTH, "%s", val); } else { snprintf(msg, BUFFER_LENGTH, "NO EXIST"); } break; } case KVS_CMD_HDEL: { // DEL : 删除 int res = kvstore_hash_delete(key); if (res < 0) { // server snprintf(msg, BUFFER_LENGTH, "%s", "ERROR"); } else if (res == 0) { snprintf(msg, BUFFER_LENGTH, "%s", "SUCCESS"); } else { snprintf(msg, BUFFER_LENGTH, "NO EXIST"); } break; } case KVS_CMD_HMOD: { // MOD : 修改 int res = kvstore_hash_modify(key, value); if (res < 0) { // server snprintf(msg, BUFFER_LENGTH, "%s", "ERROR"); } else if (res == 0) { snprintf(msg, BUFFER_LENGTH, "%s", "SUCCESS"); } else { snprintf(msg, BUFFER_LENGTH, "NO EXIST"); } break; } case KVS_CMD_HCOUNT: { // COUNT : 查询数量 int count = kvstore_hash_count(); if (count < 0) { // server snprintf(msg, BUFFER_LENGTH, "%s", "ERROR"); } else { snprintf(msg, BUFFER_LENGTH, "%d", count); } break; } default: { printf("cmd: %s\n", commands[cmd]); assert(0); } } }
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394

可以看到解析查询的过程就是将用户按我们指定协议输入的请求,分成几段,为每一条请求进行一次解析、处理

增删改查用到了哈希表这个数据结构提供的函数,而只有按空格将字符串分割这个函数是我们自行设计的,难度并不大

至此,kvstore的设计实现已经全部完成

推荐学习https://xxetb.xetslk.com/s/p5Ibb

Ongwu博客 版权声明:以上内容未经允许不得转载!授权事宜或对内容有异议或投诉,请联系站长,将尽快回复您,谢谢合作!

展开全文READ MORE
IrfanView(免费的图片查看处理工具) v4.67 中文直装版 Office Installer(Office安装工具) v1.15 by Ratiborus 绿色版

游客 回复需填写必要信息