diff --git a/src/bmz.c b/src/bmz.c index 4b6047d..27df260 100644 --- a/src/bmz.c +++ b/src/bmz.c @@ -87,7 +87,7 @@ cmph_t *bmz_new(cmph_config_t *mph, float c) // Mapping step cmph_uint32 biggest_g_value = 0; cmph_uint32 biggest_edge_value = 1; - iterations = 20; + iterations = 100; if (mph->verbosity) { fprintf(stderr, "Entering mapping step for mph creation of %u keys with graph sized %u\n", bmz->m, bmz->n); @@ -143,7 +143,7 @@ cmph_t *bmz_new(cmph_config_t *mph, float c) used_edges = (cmph_uint8 *)malloc(bmz->m/8 + 1); memset(used_edges, 0, bmz->m/8 + 1); free(bmz->g); - bmz->g = malloc(bmz->n * sizeof(cmph_uint32)); + bmz->g = calloc(bmz->n, sizeof(cmph_uint32)); assert(bmz->g); for (i = 0; i < bmz->n; ++i) // critical nodes { @@ -172,7 +172,10 @@ cmph_t *bmz_new(cmph_config_t *mph, float c) }while(restart_mapping && iterations_map > 0); graph_destroy(bmz->graph); bmz->graph = NULL; - if (iterations_map == 0) return NULL; + if (iterations_map == 0) + { + return NULL; + } mphf = (cmph_t *)malloc(sizeof(cmph_t)); mphf->algo = mph->algo; bmzf = (bmz_data_t *)malloc(sizeof(bmz_data_t)); @@ -368,15 +371,13 @@ static void bmz_traverse(bmz_config_data_t *bmz, cmph_uint8 * used_edges, cmph_u { graph_iterator_t it = graph_neighbors_it(bmz->graph, v); cmph_uint32 neighbor = 0; - cmph_uint32 gvalue; while((neighbor = graph_next_neighbor(bmz->graph, &it)) != GRAPH_NO_NEIGHBOR) { if(GETBIT(visited,neighbor)) continue; DEBUGP("Visiting neighbor %u\n", neighbor); *unused_edge_index = next_unused_edge(bmz, used_edges, *unused_edge_index); - if(*unused_edge_index < bmz->g[v]) gvalue = *unused_edge_index + bmz->m; - else gvalue = *unused_edge_index; - bmz->g[neighbor] = gvalue - bmz->g[v]; + bmz->g[neighbor] = *unused_edge_index - bmz->g[v]; + if (bmz->g[neighbor] >= bmz->m) bmz->g[neighbor] += bmz->m; SETBIT(visited, neighbor); (*unused_edge_index)++; bmz_traverse(bmz, used_edges, neighbor, unused_edge_index, visited); diff --git a/src/brz.c b/src/brz.c index 17b7b68..c496a3b 100755 --- a/src/brz.c +++ b/src/brz.c @@ -1,3 +1,4 @@ + #include "graph.h" #include "bmz.h" #include "bmz_structs.h" @@ -17,8 +18,11 @@ //#define DEBUG #include "debug.h" -static int brz_before_gen_graphs(cmph_config_t *mph, cmph_uint32 * disksize, cmph_uint32 * diskoffset); -static void brz_gen_graphs(cmph_config_t *mph, cmph_uint32 * disksize, cmph_uint32 * diskoffset, FILE * graphs_fd); +static int brz_gen_graphs(cmph_config_t *mph, FILE * graphs_fd); +static cmph_uint32 brz_min_index(cmph_uint32 * vector, cmph_uint32 n); +static void flush_buffer(cmph_uint8 *buffer, cmph_uint32 *memory_usage, FILE * graphs_fd); +static void save_in_disk(cmph_uint8 *buffer, cmph_uint8 * key, cmph_uint32 keylen, cmph_uint32 *memory_usage, cmph_uint32 memory_availability, FILE * graphs_fd); +static char * brz_read_key(FILE * fd); static char ** brz_read_keys_vd(FILE * graphs_fd, cmph_uint8 nkeys); static void brz_destroy_keys_vd(char ** keys_vd, cmph_uint8 nkeys); static void brz_copy_partial_mphf(brz_config_data_t *brz, bmz_data_t * bmzf, cmph_uint32 index, cmph_io_adapter_t *source); @@ -83,6 +87,7 @@ static cmph_uint8 brz_verify_mphf(cmph_t * mphf, cmph_io_adapter_t *source) hashtable[h] = 1; source->dispose(source->data, buf, buflen); } + fprintf(stderr, "\n===============================================================================\n"); free(hashtable); return 1; } @@ -132,8 +137,8 @@ cmph_t *brz_new(cmph_config_t *mph, float c) brz_data_t *brzf = NULL; cmph_uint32 i; cmph_uint32 iterations = 20; - cmph_uint32 * disksize = NULL; - cmph_uint32 * diskoffset = NULL; +/* cmph_uint32 * disksize = NULL; + cmph_uint32 * diskoffset = NULL;*/ cmph_io_adapter_t *source = NULL; cmph_config_t *config = NULL; @@ -146,30 +151,30 @@ cmph_t *brz_new(cmph_config_t *mph, float c) brz->c = c; brz->m = mph->key_source->nkeys; DEBUGP("m: %u\n", brz->m); - brz->k = ceil(brz->m/128); + brz->k = ceil(brz->m/170); DEBUGP("k: %u\n", brz->k); - brz->size = (cmph_uint8 *) malloc(sizeof(cmph_uint8)*brz->k); - brz->offset = (cmph_uint32 *)malloc(sizeof(cmph_uint32)*brz->k); + brz->size = (cmph_uint8 *) calloc(brz->k, sizeof(cmph_uint8)); - disksize = (cmph_uint32 *)malloc(sizeof(cmph_uint32)*brz->k); - diskoffset = (cmph_uint32 *)malloc(sizeof(cmph_uint32)*brz->k); - - for(i = 0; i < brz->k; ++i) + // Clustering the keys by graph id. + if (mph->verbosity) { - brz->size[i] = 0; - brz->offset[i] = 0; - disksize[i] = 0; - diskoffset[i] = 0; + fprintf(stderr, "Partioning the set of keys.\n"); + } + graphs_fd = fopen("/mnt/hd4/fbotelho/cmph.tmp", "wb"); + if (graphs_fd == NULL) + { + free(brz->size); + fprintf(stderr, "Unable to open file %s\n", "/mnt/hd4/fbotelho/cmph.tmp"); + return NULL; } - // Creating the external graphs. while(1) { int ok; DEBUGP("hash function 3\n"); brz->h3 = hash_state_new(brz->hashfuncs[2], brz->k); DEBUGP("Generating graphs\n"); - ok = brz_before_gen_graphs(mph, disksize, diskoffset); + ok = brz_gen_graphs(mph, graphs_fd); if (!ok) { --iterations; @@ -184,59 +189,50 @@ cmph_t *brz_new(cmph_config_t *mph, float c) } else break; } - + fclose(graphs_fd); if (iterations == 0) { DEBUGP("Graphs with more than 255 keys were created in all 20 iterations\n"); free(brz->size); - free(brz->offset); - free(disksize); - free(diskoffset); return NULL; } + DEBUGP("Graphs generated\n"); + + brz->offset = (cmph_uint32 *)calloc(brz->k, sizeof(cmph_uint32)); + for (i = 1; i < brz->k; ++i) + { + brz->offset[i] = brz->size[i-1] + brz->offset[i-1]; + } -// graphs_fd = fopen("/colecao/fbotelho/cmph.tmp", "wb"); -/* graphs_fd = fopen("cmph.tmp", "wb");*/ - graphs_fd = fopen("/var/tmp/cmph.tmp", "wb"); - if (graphs_fd == NULL) - { - free(brz->size); - free(brz->offset); - free(disksize); - free(diskoffset); - fprintf(stderr, "Unable to open file %s\n", "/colecao/fbotelho/cmph.tmp"); - return NULL; - } - // Clustering the keys by graph id. - brz_gen_graphs(mph, disksize, diskoffset, graphs_fd); - free(disksize); - free(diskoffset); - DEBUGP("Graphs generated\n"); - fclose(graphs_fd); -// graphs_fd = fopen("/colecao/fbotelho/cmph.tmp", "rb"); -/* graphs_fd = fopen("cmph.tmp", "rb");*/ - graphs_fd = fopen("/var/tmp/cmph.tmp", "rb"); // codigo do algoritmo... + graphs_fd = fopen("/mnt/hd4/fbotelho/cmph.tmp", "rb"); brz->h1 = (hash_state_t **)malloc(sizeof(hash_state_t *)*brz->k); brz->h2 = (hash_state_t **)malloc(sizeof(hash_state_t *)*brz->k); brz->g = (cmph_uint8 **) malloc(sizeof(cmph_uint8 *) *brz->k); + if (mph->verbosity) + { + fprintf(stderr, "\nGenerating mphf.\n"); + } DEBUGP("Generating mphf\n"); for(i = 0; i < brz->k; i++) { + if (mph->verbosity) fprintf(stderr, "\tMPHF %u in %u was generated.\n", i+1, brz->k); cmph_uint32 j; bmz_data_t * bmzf = NULL; cmph_uint8 nkeys = brz->size[i]; if (nkeys == 0) continue; keys_vd = brz_read_keys_vd(graphs_fd, nkeys); + // Source of keys source = cmph_io_vector_adapter(keys_vd, (cmph_uint32)nkeys); config = cmph_config_new(source); cmph_config_set_algo(config, CMPH_BMZ); cmph_config_set_graphsize(config, c); + //cmph_config_set_verbosity(config, 1); mphf_tmp = cmph_new(config); bmzf = (bmz_data_t *)mphf_tmp->data; //assert(brz_verify_mphf(mphf_tmp, source)); - brz_copy_partial_mphf(brz, bmzf, i, source); // implementar + brz_copy_partial_mphf(brz, bmzf, i, source); cmph_config_destroy(config); brz_destroy_keys_vd(keys_vd, nkeys); free(keys_vd); @@ -275,98 +271,265 @@ cmph_t *brz_new(cmph_config_t *mph, float c) return mphf; } -static int brz_before_gen_graphs(cmph_config_t *mph, cmph_uint32 * disksize, cmph_uint32 * diskoffset) +static int brz_gen_graphs(cmph_config_t *mph, FILE * graphs_fd) { - cmph_uint32 e; - brz_config_data_t *brz = (brz_config_data_t *)mph->data; - mph->key_source->rewind(mph->key_source->data); - DEBUGP("Generating information before the keys partition\n"); - for (e = 0; e < brz->m; ++e) - { - cmph_uint32 h3; - cmph_uint32 keylen; - char *key; - mph->key_source->read(mph->key_source->data, &key, &keylen); - h3 = hash(brz->h3, key, keylen) % brz->k; -// if(h3 == 6) -// { -// DEBUGP("key = %s\n", key); -// DEBUGP("keylen = %u\n", keylen + 1); -// } - - mph->key_source->dispose(mph->key_source->data, key, keylen); - if (brz->size[h3] == 255) return 0; - brz->size[h3] = brz->size[h3] + 1; - disksize[h3] = disksize[h3] + keylen + 1; -// if(h3 == 6) -// { -// DEBUGP("disksize[%u]=%u \n", h3, disksize[h3]); -// } - - } -// DEBUGP("size:%u offset: %u\n", brz->size[0], brz->offset[0]); - for (e = 1; e < brz->k; ++e) - { - brz->offset[e] = brz->size[e-1] + brz->offset[e-1]; - diskoffset[e] = disksize[e-1] + diskoffset[e-1]; -/* DEBUGP("disksize[%u]=%u diskoffset[%u]: %u\n", e, disksize[e], e, diskoffset[e]); - DEBUGP("size[%u]=%u offset[%u]: %u\n", e, brz->size[e], e, brz->offset[e]);*/ - } - return 1; -} - -// disksize nao esta sendo usado ainda. Sera usado qd incluir os buffers. -static void brz_gen_graphs(cmph_config_t *mph, cmph_uint32 * disksize, cmph_uint32 * diskoffset, FILE * graphs_fd) -{ - cmph_uint32 e; +#pragma pack(1) + cmph_uint32 i, e; brz_config_data_t *brz = (brz_config_data_t *)mph->data; + cmph_uint32 memory_availability = 209715200;//200MB //104857600;//100MB //524288000; // 500MB //209715200; // 200 MB + cmph_uint32 memory_usage = 0; + cmph_uint32 nkeys_in_buffer = 0; + cmph_uint8 *buffer = (cmph_uint8 *)malloc(memory_availability); + cmph_uint32 *buckets_size = (cmph_uint32 *)calloc(brz->k, sizeof(cmph_uint32)); + cmph_uint32 *keys_index = NULL; + cmph_uint8 **buffer_merge = NULL; + cmph_uint32 *buffer_h3 = NULL; + cmph_uint32 nflushes = 0; + cmph_uint32 h3; + FILE * tmp_fd = NULL; + FILE ** tmp_fds = NULL; + char filename[100]; + char *key = NULL; + cmph_uint32 keylen; + mph->key_source->rewind(mph->key_source->data); DEBUGP("Generating graphs from %u keys\n", brz->m); + // Partitioning for (e = 0; e < brz->m; ++e) { - cmph_uint32 h3; - cmph_uint32 keylen; - char *key; mph->key_source->read(mph->key_source->data, &key, &keylen); + + /* Buffers management */ + if (memory_usage + keylen + 1 > memory_availability) // flush buffers + { + fprintf(stderr, "Flushing %u\n", nkeys_in_buffer); + cmph_uint32 value = buckets_size[0]; + cmph_uint32 sum = 0; + + cmph_uint32 keylen1 = 0; + buckets_size[0] = 0; + for(i = 1; i < brz->k; i++) + { + if(buckets_size[i] == 0) continue; + sum += value; + value = buckets_size[i]; + buckets_size[i] = sum; + + } + memory_usage = 0; + keys_index = (cmph_uint32 *)calloc(nkeys_in_buffer, sizeof(cmph_uint32)); + for(i = 0; i < nkeys_in_buffer; i++) + { + keylen1 = strlen(buffer + memory_usage); + h3 = hash(brz->h3, buffer + memory_usage, keylen1) % brz->k; + keys_index[buckets_size[h3]] = memory_usage; + buckets_size[h3]++; + memory_usage = memory_usage + keylen1 + 1; + } + sprintf(filename, "/mnt/hd4/fbotelho/%u.cmph",nflushes); + tmp_fd = fopen(filename, "wb"); + for(i = 0; i < nkeys_in_buffer; i++) + { + keylen1 = strlen(buffer + keys_index[i]) + 1; + fwrite(buffer + keys_index[i], 1, keylen1, tmp_fd); + } + nkeys_in_buffer = 0; + memory_usage = 0; + bzero(buckets_size, brz->k*sizeof(cmph_uint32)); + nflushes++; + free(keys_index); + fclose(tmp_fd); + fprintf(stderr, "Flushing is over\n"); + } + //fprintf(stderr, "Storing read Key\n"); + memcpy(buffer + memory_usage, key, keylen + 1); + memory_usage = memory_usage + keylen + 1; h3 = hash(brz->h3, key, keylen) % brz->k; -/* if(h3 == 6) + if (brz->size[h3] == 255) { - DEBUGP("key = %s\n", key); - DEBUGP("keylen = %u\n", keylen + 1); - }*/ - fseek(graphs_fd, diskoffset[h3], SEEK_SET); - fwrite(key, sizeof(char), keylen + 1, graphs_fd); -/* if(h3 == 6) - { - DEBUGP("diskoffset[%u]=%u \n", h3, diskoffset[h3]); - }*/ - diskoffset[h3] = diskoffset[h3] + keylen + 1; + free(buffer); + free(buckets_size); + return 0; + } + brz->size[h3] = brz->size[h3] + 1; + buckets_size[h3] ++; + nkeys_in_buffer++; + mph->key_source->dispose(mph->key_source->data, key, keylen); } + + if (memory_usage != 0) // flush buffers + { + fprintf(stderr, "Flushing %u\n", nkeys_in_buffer); + cmph_uint32 value = buckets_size[0]; + cmph_uint32 sum = 0; + cmph_uint32 keylen1 = 0; + buckets_size[0] = 0; + for(i = 1; i < brz->k; i++) + { + if(buckets_size[i] == 0) continue; + sum += value; + value = buckets_size[i]; + buckets_size[i] = sum; + } + memory_usage = 0; + keys_index = (cmph_uint32 *)calloc(nkeys_in_buffer, sizeof(cmph_uint32)); + for(i = 0; i < nkeys_in_buffer; i++) + { + keylen1 = strlen(buffer + memory_usage); + h3 = hash(brz->h3, buffer + memory_usage, keylen1) % brz->k; + keys_index[buckets_size[h3]] = memory_usage; + buckets_size[h3]++; + memory_usage = memory_usage + keylen1 + 1; + } + sprintf(filename, "/mnt/hd4/fbotelho/%u.cmph",nflushes); + tmp_fd = fopen(filename, "wb"); + for(i = 0; i < nkeys_in_buffer; i++) + { + keylen1 = strlen(buffer + keys_index[i]) + 1; + fwrite(buffer + keys_index[i], 1, keylen1, tmp_fd); + } + nkeys_in_buffer = 0; + memory_usage = 0; + bzero(buckets_size, brz->k*sizeof(cmph_uint32)); + nflushes++; + free(keys_index); + fclose(tmp_fd); + fprintf(stderr, "Flushing is over\n"); + } + free(buffer); + free(buckets_size); + if(nflushes > 1024) return 0; // Too many files generated. + + // Merging + fprintf(stderr, "\nMerging files\n"); + tmp_fds = (FILE **)calloc(nflushes, sizeof(FILE *)); + buffer_merge = (cmph_uint8 **)calloc(nflushes, sizeof(cmph_uint8 *)); + buffer_h3 = (cmph_uint32 *)calloc(nflushes, sizeof(cmph_uint32)); + + for(i = 0; i < nflushes; i++) + { + sprintf(filename, "/mnt/hd4/fbotelho/%u.cmph",i); + tmp_fds[i] = fopen(filename, "rb"); + key = brz_read_key(tmp_fds[i]); + keylen = strlen(key); + h3 = hash(brz->h3, key, keylen) % brz->k; + buffer_h3[i] = h3; + buffer_merge[i] = (cmph_uint8 *)calloc(keylen + 1, sizeof(cmph_uint8)); + memcpy(buffer_merge[i], key, keylen + 1); + free(key); + } + e = 0; + buffer = (cmph_uint8 *)malloc(memory_availability); + while(e < brz->m) + { + i = brz_min_index(buffer_h3, nflushes); + key = brz_read_key(tmp_fds[i]); + if(key) + { + while(key) + { + keylen = strlen(key); + h3 = hash(brz->h3, key, keylen) % brz->k; + if (h3 != buffer_h3[i]) break; + save_in_disk(buffer, key, keylen, &memory_usage, memory_availability, graphs_fd); + //fwrite(key, 1, keylen + 1, graphs_fd); + e++; + free(key); + key = brz_read_key(tmp_fds[i]); + } + if (key) + { + save_in_disk(buffer, buffer_merge[i], strlen(buffer_merge[i]), &memory_usage, memory_availability, graphs_fd); + //fwrite(buffer_merge[i], 1, strlen(buffer_merge[i]) + 1, graphs_fd); + e++; + buffer_h3[i] = h3; + free(buffer_merge[i]); + buffer_merge[i] = (cmph_uint8 *)calloc(keylen + 1, sizeof(cmph_uint8)); + memcpy(buffer_merge[i], key, keylen + 1); + free(key); + } + } +/* fprintf(stderr, "BOSTA %u %u e: %u\n", i, buffer_h3[i], e);*/ + if(!key) + { + save_in_disk(buffer, buffer_merge[i], strlen(buffer_merge[i]), &memory_usage, memory_availability, graphs_fd); + //fwrite(buffer_merge[i], 1, strlen(buffer_merge[i]) + 1, graphs_fd); + e++; + buffer_h3[i] = UINT_MAX; + free(buffer_merge[i]); + buffer_merge[i] = NULL; + } + } + for(i = 0; i < nflushes; i++) fclose(tmp_fds[i]); + flush_buffer(buffer, &memory_usage, graphs_fd); + free(tmp_fds); + free(buffer); + free(buffer_merge); + free(buffer_h3); + return 1; +#pragma pack() +} + +static void flush_buffer(cmph_uint8 *buffer, cmph_uint32 *memory_usage, FILE * graphs_fd) +{ + fwrite(buffer, 1, *memory_usage, graphs_fd); + *memory_usage = 0; +} + +static void save_in_disk(cmph_uint8 *buffer, cmph_uint8 * key, cmph_uint32 keylen, cmph_uint32 * memory_usage, + cmph_uint32 memory_availability, FILE * graphs_fd) +{ + if(*memory_usage + keylen + 1 > memory_availability) + { + flush_buffer(buffer, memory_usage, graphs_fd); + } + memcpy(buffer + *memory_usage, key, keylen + 1); + *memory_usage = *memory_usage + keylen + 1; +} + +static cmph_uint32 brz_min_index(cmph_uint32 * vector, cmph_uint32 n) +{ + cmph_uint32 i, min_index = 0; + for(i = 1; i < n; i++) + { + if(vector[i] < vector[min_index]) min_index = i; + } + return min_index; +} + +static char * brz_read_key(FILE * fd) +{ + char * buf = (char *)malloc(BUFSIZ); + cmph_uint32 buf_pos = 0; + char c; + while(1) + { + fread(&c, sizeof(char), 1, fd); + if(feof(fd)) + { + free(buf); + return NULL; + } + buf[buf_pos++] = c; + if(c == '\0') break; + if(buf_pos % BUFSIZ == 0) buf = (char *)realloc(buf, buf_pos + BUFSIZ); + } + return buf; } static char ** brz_read_keys_vd(FILE * graphs_fd, cmph_uint8 nkeys) { char ** keys_vd = (char **)malloc(sizeof(char *)*nkeys); - cmph_uint8 i; + cmph_uint8 i; for(i = 0; i < nkeys; i++) { - char * buf = (char *)malloc(BUFSIZ); - cmph_uint32 buf_pos = 0; - char c; - while(1) - { - - fread(&c, sizeof(char), 1, graphs_fd); - buf[buf_pos++] = c; - if(c == '\0') break; - if(buf_pos % BUFSIZ == 0) buf = (char *)realloc(buf, buf_pos + BUFSIZ); - } + char * buf = brz_read_key(graphs_fd); keys_vd[i] = (char *)malloc(strlen(buf) + 1); strcpy(keys_vd[i], buf); - free(buf); - } + free(buf); + } return keys_vd; } diff --git a/src/main.c b/src/main.c index c010123..ed24a72 100644 --- a/src/main.c +++ b/src/main.c @@ -248,6 +248,7 @@ int main(int argc, char **argv) cmph_uint32 buflen = 0; source->read(source->data, &buf, &buflen); h = cmph_search(mphf, buf, buflen); + assert(h < source->nkeys); if(hashtable[h])fprintf(stderr, "collision: %u\n",h); assert(hashtable[h]==0); hashtable[h] = 1;