diff --git a/confluent_osdeploy/utils/urlmount.c b/confluent_osdeploy/utils/urlmount.c index 4292ed4e..38883af3 100644 --- a/confluent_osdeploy/utils/urlmount.c +++ b/confluent_osdeploy/utils/urlmount.c @@ -20,15 +20,21 @@ #include #include #include +#include +#include +#include #include #include +#include #include #include +#include #include CURL *curl; char curlerror[CURL_ERROR_SIZE]; +char recheckerror[CURL_ERROR_SIZE]; curl_off_t filesize; typedef struct downloadbuffer { @@ -42,11 +48,34 @@ typedef struct downloadbuffer { static char filename[MAX_FILE_LEN + 1]; static int urlidx, newidx; static char* urls[MAX_URL_PATHS + 1]; +static pthread_mutex_t url_state_lock = PTHREAD_MUTEX_INITIALIZER; + + +static void log_status(const char *priority, const char *fmt, ...) { + va_list ap; + FILE *fd = fopen("/dev/kmsg", "w+"); + + if (fd == NULL) + fd = stderr; + + fprintf(fd, "%surlmount: ", priority); + va_start(ap, fmt); + vfprintf(fd, fmt, ap); + va_end(ap); + fprintf(fd, "\n"); + + if (fd != stderr) + fclose(fd); +} void *http_rechecker(void *argp) { CURL *checkurl; + int current_newidx; + int current_urlidx; int tmpidx, tmpval; + + (void)argp; tmpidx = open("/dev/urandom", O_RDONLY); if (tmpidx <= 0 || read(tmpidx, (char*)&tmpval, 4) < 0) tmpval = time(NULL) & 0xffffffff; @@ -54,7 +83,11 @@ void *http_rechecker(void *argp) { close(tmpidx); srand(tmpval); checkurl = curl_easy_init(); - if (curl_easy_setopt(checkurl, CURLOPT_ERRORBUFFER, curlerror) != CURLE_OK) { + if (checkurl == NULL) { + fprintf(stderr, "Unable to initialize CURL for rechecker\n"); + return NULL; + } + if (curl_easy_setopt(checkurl, CURLOPT_ERRORBUFFER, recheckerror) != CURLE_OK) { fprintf(stderr, "Error buffer\n"); exit(1); } @@ -74,15 +107,24 @@ void *http_rechecker(void *argp) { } while (1) { sleep(25 + tmpval % 10); // Spread out retries across systems + + pthread_mutex_lock(&url_state_lock); + current_urlidx = urlidx; + current_newidx = newidx; + pthread_mutex_unlock(&url_state_lock); + tmpidx = 0; - while (tmpidx < urlidx && tmpidx < newidx && urls[tmpidx] != NULL) { + while (tmpidx < current_urlidx && tmpidx < current_newidx && urls[tmpidx] != NULL) { if (curl_easy_setopt(checkurl, CURLOPT_URL, urls[tmpidx]) != CURLE_OK) { tmpidx++; continue; } - if (curl_easy_perform(checkurl) == CURLE_OK) + if (curl_easy_perform(checkurl) == CURLE_OK) { + pthread_mutex_lock(&url_state_lock); newidx = tmpidx; - else + pthread_mutex_unlock(&url_state_lock); + break; + } else tmpidx++; } } @@ -111,11 +153,10 @@ size_t fill_buffer(char *data, size_t size, size_t nmemb, downloadbuffer *userda static int http_read(const char *path, char *buf, size_t size, off_t offset, struct fuse_file_info *fi) { char headbuffer[512]; + const char *current_url; double dldbl = 0.0; int startidx; int reconnecting = 0; - FILE* fd; - startidx = urlidx; memset(buf, 0, size); curl_off_t downloaded; //Would be needed for multithread, however preferring to conserve @@ -128,7 +169,6 @@ static int http_read(const char *path, char *buf, size_t size, off_t offset, dlbuf.response = buf; dlbuf.completed = 0; dlbuf.total = size; - fd = NULL; if (strcmp(path, filename) != 0) return -ENOENT; memset(headbuffer, 0, 512); @@ -143,47 +183,53 @@ static int http_read(const char *path, char *buf, size_t size, off_t offset, fprintf(stderr, "Error setting writedata\n"); exit(1); } + + pthread_mutex_lock(&url_state_lock); + startidx = urlidx; if (newidx < MAX_URL_PATHS) { reconnecting = 1; urlidx = newidx; newidx = MAX_URL_PATHS; - fd = fopen("/dev/kmsg", "w+"); - fprintf(fd, "<5>urlmount: Connecting to %s\n", urls[urlidx]); - fclose(fd); - // if fail, carry on and take the error in curl_easy_perform instead - if (curl_easy_setopt(curl, CURLOPT_URL, urls[urlidx]) != CURLE_OK) {} } + current_url = urls[urlidx]; + pthread_mutex_unlock(&url_state_lock); + + if (reconnecting) { + log_status("<5>", "Connecting to %s", current_url); + // if fail, carry on and take the error in curl_easy_perform instead + if (curl_easy_setopt(curl, CURLOPT_URL, current_url) != CURLE_OK) {} + } + while (curl_easy_perform(curl) != CURLE_OK) { + int wrapped; + reconnecting = 1; - fd = fopen("/dev/kmsg", "w+"); dlbuf.completed = 0; - fprintf(fd, "<4>urlmount: error while communicating with %s: %s\n", urls[urlidx], curlerror); - fclose(fd); + log_status("<4>", "error while communicating with %s: %s", current_url, curlerror); + + pthread_mutex_lock(&url_state_lock); urlidx++; - if (urlidx > MAX_URL_PATHS) + if (urlidx > MAX_URL_PATHS || urls[urlidx] == NULL) urlidx = 0; - if (urls[urlidx] == NULL) - urlidx = 0; - if (urlidx == startidx) { - fd = fopen("/dev/kmsg", "w+"); - fprintf(fd, "<1>urlmount: All connections to source are down\n"); - fclose(fd); + wrapped = (urlidx == startidx); + current_url = urls[urlidx]; + pthread_mutex_unlock(&url_state_lock); + + if (wrapped) { + log_status("<1>", "All connections to source are down"); sleep(10); } - fd = fopen("/dev/kmsg", "w+"); - fprintf(fd, "<5>urlmount: Connecting to %s\n", urls[urlidx]); - fclose(fd); + + log_status("<5>", "Connecting to %s", current_url); if (urlidx > MAX_URL_PATHS) { fprintf(stderr, "Maximum url path exceeded\n"); exit(1); } // ignore, let the curl_easy_perform get the error - if (curl_easy_setopt(curl, CURLOPT_URL, urls[urlidx]) != CURLE_OK) {} + if (curl_easy_setopt(curl, CURLOPT_URL, current_url) != CURLE_OK) {} } if (reconnecting) { - fd = fopen("/dev/kmsg", "w+"); - fprintf(fd, "<5>urlmount: Successfully connected to %s\n", urls[urlidx]); - fclose(fd); + log_status("<5>", "Successfully connected to %s", current_url); } curl_easy_getinfo(curl, CURLINFO_SIZE_DOWNLOAD, &dldbl); downloaded = round(dldbl); @@ -206,9 +252,16 @@ static void* http_init(struct fuse_conn_info *conn) { // Because we fork, we need to redo curl // or else suffer the wrath of NSS TLS pthread_t tid; + + (void)conn; curl_global_init(CURL_GLOBAL_DEFAULT); pthread_create(&tid, NULL, http_rechecker, NULL); + pthread_detach(tid); curl = curl_easy_init(); + if (curl == NULL) { + fprintf(stderr, "Unable to initialize CURL\n"); + exit(1); + } if (curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, curlerror) != CURLE_OK) { fprintf(stderr, "Failure initializing libcurl error buffor\n"); exit(1); @@ -264,11 +317,15 @@ int main(int argc, char* argv[]) { unsigned int i; int j; j = 0; - memset(urls, 0, 32*sizeof(char*)); + memset(urls, 0, sizeof(urls)); urlidx = 0; newidx = MAX_URL_PATHS; curl_global_init(CURL_GLOBAL_DEFAULT); curl = curl_easy_init(); + if (!curl) { + fprintf(stderr, "Unable to initialize CURL!\n"); + exit(1); + } if (curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, curlerror) != CURLE_OK) { fprintf(stderr, "Unable to set error buffer\n"); exit(1); @@ -283,13 +340,19 @@ int main(int argc, char* argv[]) { fprintf(stderr, "Unable to setup curl timeout\n"); exit(1); } - memset(filename, 0, MAX_FILE_LEN); + memset(filename, 0, sizeof(filename)); for (i=0; i < argc; i++) { - if (strstr(argv[i], ":") > 0) { + if (strstr(argv[i], ":") != NULL) { if (j < MAX_URL_PATHS) { urls[j] = argv[i]; - tmp = strrchr(urls[j++], '/'); + tmp = strrchr(urls[j], '/'); + if (tmp == NULL || tmp[1] == '\0') { + fprintf(stderr, "URL must include a filename: %s\n", urls[j]); + exit(1); + } + j++; strncpy(filename, tmp, MAX_FILE_LEN); + filename[MAX_FILE_LEN] = '\0'; } //Request single threaded mode, as curl would need more // filehandles for multithread @@ -338,10 +401,6 @@ int main(int argc, char* argv[]) { fprintf(stderr, "Unable to reach designated URL\n"); exit(1); } - if (!curl) { - fprintf(stderr, "Unable to initialize CURL!\n"); - exit(1); - } curl_easy_cleanup(curl); curl_global_cleanup(); fuse_main(argc, argv, &http_ops, NULL);