mirror of
https://github.com/xcat2/confluent.git
synced 2026-06-02 00:08:35 +00:00
Various cleanups for urlmount
This commit is contained in:
@@ -20,15 +20,21 @@
|
||||
#include <fuse.h>
|
||||
#include <curl/curl.h>
|
||||
#include <errno.h>
|
||||
#include <fcntl.h>
|
||||
#include <stdio.h>
|
||||
#include <stdarg.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <sys/stat.h>
|
||||
#include <math.h>
|
||||
#include <pthread.h>
|
||||
#include <time.h>
|
||||
#include <unistd.h>
|
||||
|
||||
CURL *curl;
|
||||
|
||||
char curlerror[CURL_ERROR_SIZE];
|
||||
char recheckerror[CURL_ERROR_SIZE];
|
||||
curl_off_t filesize;
|
||||
|
||||
typedef struct downloadbuffer {
|
||||
@@ -42,11 +48,34 @@ typedef struct downloadbuffer {
|
||||
static char filename[MAX_FILE_LEN + 1];
|
||||
static int urlidx, newidx;
|
||||
static char* urls[MAX_URL_PATHS + 1];
|
||||
static pthread_mutex_t url_state_lock = PTHREAD_MUTEX_INITIALIZER;
|
||||
|
||||
|
||||
static void log_status(const char *priority, const char *fmt, ...) {
|
||||
va_list ap;
|
||||
FILE *fd = fopen("/dev/kmsg", "w+");
|
||||
|
||||
if (fd == NULL)
|
||||
fd = stderr;
|
||||
|
||||
fprintf(fd, "%surlmount: ", priority);
|
||||
va_start(ap, fmt);
|
||||
vfprintf(fd, fmt, ap);
|
||||
va_end(ap);
|
||||
fprintf(fd, "\n");
|
||||
|
||||
if (fd != stderr)
|
||||
fclose(fd);
|
||||
}
|
||||
|
||||
|
||||
void *http_rechecker(void *argp) {
|
||||
CURL *checkurl;
|
||||
int current_newidx;
|
||||
int current_urlidx;
|
||||
int tmpidx, tmpval;
|
||||
|
||||
(void)argp;
|
||||
tmpidx = open("/dev/urandom", O_RDONLY);
|
||||
if (tmpidx <= 0 || read(tmpidx, (char*)&tmpval, 4) < 0)
|
||||
tmpval = time(NULL) & 0xffffffff;
|
||||
@@ -54,7 +83,11 @@ void *http_rechecker(void *argp) {
|
||||
close(tmpidx);
|
||||
srand(tmpval);
|
||||
checkurl = curl_easy_init();
|
||||
if (curl_easy_setopt(checkurl, CURLOPT_ERRORBUFFER, curlerror) != CURLE_OK) {
|
||||
if (checkurl == NULL) {
|
||||
fprintf(stderr, "Unable to initialize CURL for rechecker\n");
|
||||
return NULL;
|
||||
}
|
||||
if (curl_easy_setopt(checkurl, CURLOPT_ERRORBUFFER, recheckerror) != CURLE_OK) {
|
||||
fprintf(stderr, "Error buffer\n");
|
||||
exit(1);
|
||||
}
|
||||
@@ -74,15 +107,24 @@ void *http_rechecker(void *argp) {
|
||||
}
|
||||
while (1) {
|
||||
sleep(25 + tmpval % 10); // Spread out retries across systems
|
||||
|
||||
pthread_mutex_lock(&url_state_lock);
|
||||
current_urlidx = urlidx;
|
||||
current_newidx = newidx;
|
||||
pthread_mutex_unlock(&url_state_lock);
|
||||
|
||||
tmpidx = 0;
|
||||
while (tmpidx < urlidx && tmpidx < newidx && urls[tmpidx] != NULL) {
|
||||
while (tmpidx < current_urlidx && tmpidx < current_newidx && urls[tmpidx] != NULL) {
|
||||
if (curl_easy_setopt(checkurl, CURLOPT_URL, urls[tmpidx]) != CURLE_OK) {
|
||||
tmpidx++;
|
||||
continue;
|
||||
}
|
||||
if (curl_easy_perform(checkurl) == CURLE_OK)
|
||||
if (curl_easy_perform(checkurl) == CURLE_OK) {
|
||||
pthread_mutex_lock(&url_state_lock);
|
||||
newidx = tmpidx;
|
||||
else
|
||||
pthread_mutex_unlock(&url_state_lock);
|
||||
break;
|
||||
} else
|
||||
tmpidx++;
|
||||
}
|
||||
}
|
||||
@@ -111,11 +153,10 @@ size_t fill_buffer(char *data, size_t size, size_t nmemb, downloadbuffer *userda
|
||||
static int http_read(const char *path, char *buf, size_t size, off_t offset,
|
||||
struct fuse_file_info *fi) {
|
||||
char headbuffer[512];
|
||||
const char *current_url;
|
||||
double dldbl = 0.0;
|
||||
int startidx;
|
||||
int reconnecting = 0;
|
||||
FILE* fd;
|
||||
startidx = urlidx;
|
||||
memset(buf, 0, size);
|
||||
curl_off_t downloaded;
|
||||
//Would be needed for multithread, however preferring to conserve
|
||||
@@ -128,7 +169,6 @@ static int http_read(const char *path, char *buf, size_t size, off_t offset,
|
||||
dlbuf.response = buf;
|
||||
dlbuf.completed = 0;
|
||||
dlbuf.total = size;
|
||||
fd = NULL;
|
||||
|
||||
if (strcmp(path, filename) != 0) return -ENOENT;
|
||||
memset(headbuffer, 0, 512);
|
||||
@@ -143,47 +183,53 @@ static int http_read(const char *path, char *buf, size_t size, off_t offset,
|
||||
fprintf(stderr, "Error setting writedata\n");
|
||||
exit(1);
|
||||
}
|
||||
|
||||
pthread_mutex_lock(&url_state_lock);
|
||||
startidx = urlidx;
|
||||
if (newidx < MAX_URL_PATHS) {
|
||||
reconnecting = 1;
|
||||
urlidx = newidx;
|
||||
newidx = MAX_URL_PATHS;
|
||||
fd = fopen("/dev/kmsg", "w+");
|
||||
fprintf(fd, "<5>urlmount: Connecting to %s\n", urls[urlidx]);
|
||||
fclose(fd);
|
||||
// if fail, carry on and take the error in curl_easy_perform instead
|
||||
if (curl_easy_setopt(curl, CURLOPT_URL, urls[urlidx]) != CURLE_OK) {}
|
||||
}
|
||||
current_url = urls[urlidx];
|
||||
pthread_mutex_unlock(&url_state_lock);
|
||||
|
||||
if (reconnecting) {
|
||||
log_status("<5>", "Connecting to %s", current_url);
|
||||
// if fail, carry on and take the error in curl_easy_perform instead
|
||||
if (curl_easy_setopt(curl, CURLOPT_URL, current_url) != CURLE_OK) {}
|
||||
}
|
||||
|
||||
while (curl_easy_perform(curl) != CURLE_OK) {
|
||||
int wrapped;
|
||||
|
||||
reconnecting = 1;
|
||||
fd = fopen("/dev/kmsg", "w+");
|
||||
dlbuf.completed = 0;
|
||||
fprintf(fd, "<4>urlmount: error while communicating with %s: %s\n", urls[urlidx], curlerror);
|
||||
fclose(fd);
|
||||
log_status("<4>", "error while communicating with %s: %s", current_url, curlerror);
|
||||
|
||||
pthread_mutex_lock(&url_state_lock);
|
||||
urlidx++;
|
||||
if (urlidx > MAX_URL_PATHS)
|
||||
if (urlidx > MAX_URL_PATHS || urls[urlidx] == NULL)
|
||||
urlidx = 0;
|
||||
if (urls[urlidx] == NULL)
|
||||
urlidx = 0;
|
||||
if (urlidx == startidx) {
|
||||
fd = fopen("/dev/kmsg", "w+");
|
||||
fprintf(fd, "<1>urlmount: All connections to source are down\n");
|
||||
fclose(fd);
|
||||
wrapped = (urlidx == startidx);
|
||||
current_url = urls[urlidx];
|
||||
pthread_mutex_unlock(&url_state_lock);
|
||||
|
||||
if (wrapped) {
|
||||
log_status("<1>", "All connections to source are down");
|
||||
sleep(10);
|
||||
}
|
||||
fd = fopen("/dev/kmsg", "w+");
|
||||
fprintf(fd, "<5>urlmount: Connecting to %s\n", urls[urlidx]);
|
||||
fclose(fd);
|
||||
|
||||
log_status("<5>", "Connecting to %s", current_url);
|
||||
if (urlidx > MAX_URL_PATHS) {
|
||||
fprintf(stderr, "Maximum url path exceeded\n");
|
||||
exit(1);
|
||||
}
|
||||
// ignore, let the curl_easy_perform get the error
|
||||
if (curl_easy_setopt(curl, CURLOPT_URL, urls[urlidx]) != CURLE_OK) {}
|
||||
if (curl_easy_setopt(curl, CURLOPT_URL, current_url) != CURLE_OK) {}
|
||||
}
|
||||
if (reconnecting) {
|
||||
fd = fopen("/dev/kmsg", "w+");
|
||||
fprintf(fd, "<5>urlmount: Successfully connected to %s\n", urls[urlidx]);
|
||||
fclose(fd);
|
||||
log_status("<5>", "Successfully connected to %s", current_url);
|
||||
}
|
||||
curl_easy_getinfo(curl, CURLINFO_SIZE_DOWNLOAD, &dldbl);
|
||||
downloaded = round(dldbl);
|
||||
@@ -206,9 +252,16 @@ static void* http_init(struct fuse_conn_info *conn) {
|
||||
// Because we fork, we need to redo curl
|
||||
// or else suffer the wrath of NSS TLS
|
||||
pthread_t tid;
|
||||
|
||||
(void)conn;
|
||||
curl_global_init(CURL_GLOBAL_DEFAULT);
|
||||
pthread_create(&tid, NULL, http_rechecker, NULL);
|
||||
pthread_detach(tid);
|
||||
curl = curl_easy_init();
|
||||
if (curl == NULL) {
|
||||
fprintf(stderr, "Unable to initialize CURL\n");
|
||||
exit(1);
|
||||
}
|
||||
if (curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, curlerror) != CURLE_OK) {
|
||||
fprintf(stderr, "Failure initializing libcurl error buffor\n");
|
||||
exit(1);
|
||||
@@ -264,11 +317,15 @@ int main(int argc, char* argv[]) {
|
||||
unsigned int i;
|
||||
int j;
|
||||
j = 0;
|
||||
memset(urls, 0, 32*sizeof(char*));
|
||||
memset(urls, 0, sizeof(urls));
|
||||
urlidx = 0;
|
||||
newidx = MAX_URL_PATHS;
|
||||
curl_global_init(CURL_GLOBAL_DEFAULT);
|
||||
curl = curl_easy_init();
|
||||
if (!curl) {
|
||||
fprintf(stderr, "Unable to initialize CURL!\n");
|
||||
exit(1);
|
||||
}
|
||||
if (curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, curlerror) != CURLE_OK) {
|
||||
fprintf(stderr, "Unable to set error buffer\n");
|
||||
exit(1);
|
||||
@@ -283,13 +340,19 @@ int main(int argc, char* argv[]) {
|
||||
fprintf(stderr, "Unable to setup curl timeout\n");
|
||||
exit(1);
|
||||
}
|
||||
memset(filename, 0, MAX_FILE_LEN);
|
||||
memset(filename, 0, sizeof(filename));
|
||||
for (i=0; i < argc; i++) {
|
||||
if (strstr(argv[i], ":") > 0) {
|
||||
if (strstr(argv[i], ":") != NULL) {
|
||||
if (j < MAX_URL_PATHS) {
|
||||
urls[j] = argv[i];
|
||||
tmp = strrchr(urls[j++], '/');
|
||||
tmp = strrchr(urls[j], '/');
|
||||
if (tmp == NULL || tmp[1] == '\0') {
|
||||
fprintf(stderr, "URL must include a filename: %s\n", urls[j]);
|
||||
exit(1);
|
||||
}
|
||||
j++;
|
||||
strncpy(filename, tmp, MAX_FILE_LEN);
|
||||
filename[MAX_FILE_LEN] = '\0';
|
||||
}
|
||||
//Request single threaded mode, as curl would need more
|
||||
// filehandles for multithread
|
||||
@@ -338,10 +401,6 @@ int main(int argc, char* argv[]) {
|
||||
fprintf(stderr, "Unable to reach designated URL\n");
|
||||
exit(1);
|
||||
}
|
||||
if (!curl) {
|
||||
fprintf(stderr, "Unable to initialize CURL!\n");
|
||||
exit(1);
|
||||
}
|
||||
curl_easy_cleanup(curl);
|
||||
curl_global_cleanup();
|
||||
fuse_main(argc, argv, &http_ops, NULL);
|
||||
|
||||
Reference in New Issue
Block a user