mirror of
https://github.com/xcat2/confluent.git
synced 2026-01-11 02:22:31 +00:00
Implement a test with retry for basic communication
confuesbox is likely to be a very early utility, and the relevant network is at high risk of being merely 'partially' up.
This commit is contained in:
@@ -2,20 +2,23 @@ package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"net"
|
||||
"net/http"
|
||||
"crypto/x509"
|
||||
"crypto/tls"
|
||||
"os"
|
||||
"strings"
|
||||
"errors"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ApiClient struct {
|
||||
server string
|
||||
server string
|
||||
urlserver string
|
||||
apikey string
|
||||
nodename string
|
||||
apikey string
|
||||
nodename string
|
||||
webclient *http.Client
|
||||
}
|
||||
|
||||
@@ -24,7 +27,7 @@ func NewApiClient(cafile string, keyfile string, nodename string, server string)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cacerts := x509.NewCertPool()
|
||||
cacerts := x509.NewCertPool()
|
||||
cacerts.AppendCertsFromPEM(currcacerts)
|
||||
apikey := []byte("")
|
||||
if keyfile != "" {
|
||||
@@ -32,7 +35,7 @@ func NewApiClient(cafile string, keyfile string, nodename string, server string)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if apikey[len(apikey) - 1] == 0xa {
|
||||
if apikey[len(apikey)-1] == 0xa {
|
||||
apikey = apikey[:len(apikey)-1]
|
||||
}
|
||||
}
|
||||
@@ -40,7 +43,9 @@ func NewApiClient(cafile string, keyfile string, nodename string, server string)
|
||||
cinfo, err := os.ReadFile("/etc/confluent/confliuent.info")
|
||||
if err != nil {
|
||||
nodename, err = os.Hostname()
|
||||
if err != nil { return nil, err }
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
cinfolines := bytes.Split(cinfo, []byte("\n"))
|
||||
if bytes.Contains(cinfolines[0], []byte("NODENAME")) {
|
||||
@@ -48,6 +53,20 @@ func NewApiClient(cafile string, keyfile string, nodename string, server string)
|
||||
nodename = string(cnodebytes[0])
|
||||
}
|
||||
}
|
||||
// Test connectivity with up to 3 retries
|
||||
var conn net.Conn
|
||||
for i := 0; i < 3; i++ {
|
||||
conn, err = net.Dial("tcp", net.JoinHostPort(server, "443"))
|
||||
if err == nil {
|
||||
conn.Close()
|
||||
break
|
||||
}
|
||||
time.Sleep(5 * time.Second)
|
||||
fmt.Print("Connection attempt failed, retrying...\n")
|
||||
if i == 2 {
|
||||
return nil, fmt.Errorf("failed to connect after 3 attempts: %v", err)
|
||||
}
|
||||
}
|
||||
urlserver := server
|
||||
if strings.Contains(server, ":") {
|
||||
if strings.Contains(server, "%") && !strings.Contains(server, "%25") {
|
||||
@@ -58,10 +77,11 @@ func NewApiClient(cafile string, keyfile string, nodename string, server string)
|
||||
server = server[:strings.Index(server, "%")]
|
||||
}
|
||||
}
|
||||
|
||||
webclient := &http.Client{
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: &tls.Config{
|
||||
RootCAs: cacerts,
|
||||
RootCAs: cacerts,
|
||||
ServerName: server,
|
||||
},
|
||||
},
|
||||
@@ -70,34 +90,42 @@ func NewApiClient(cafile string, keyfile string, nodename string, server string)
|
||||
return &vc, nil
|
||||
}
|
||||
|
||||
func (apiclient *ApiClient) RegisterKey(crypted string, hmac string) (error) {
|
||||
func (apiclient *ApiClient) RegisterKey(crypted string, hmac string) error {
|
||||
cryptbytes := []byte(crypted)
|
||||
cryptbuffer := bytes.NewBuffer(cryptbytes)
|
||||
_, err := apiclient.request("/confluent-api/self/registerapikey", "", cryptbuffer, "", hmac)
|
||||
return err
|
||||
}
|
||||
|
||||
func (apiclient *ApiClient) Fetch(url string, outputfile string, mime string, body io.Reader) (error) {
|
||||
func (apiclient *ApiClient) Fetch(url string, outputfile string, mime string, body io.Reader) error {
|
||||
outp, err := os.Create(outputfile)
|
||||
if err != nil { return err }
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer outp.Close()
|
||||
rsp, err := apiclient.request(url, mime, body, "", "")
|
||||
if err != nil { return err }
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = io.Copy(outp, rsp)
|
||||
return err
|
||||
}
|
||||
|
||||
func (apiclient *ApiClient) GrabText(url string, mime string, body io.Reader) (string, error){
|
||||
func (apiclient *ApiClient) GrabText(url string, mime string, body io.Reader) (string, error) {
|
||||
rsp, err := apiclient.request(url, mime, body, "", "")
|
||||
if err != nil { return "", err }
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
rspdata, err := io.ReadAll(rsp)
|
||||
if err != nil { return "", err }
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
rsptxt := string(rspdata)
|
||||
return rsptxt, nil
|
||||
}
|
||||
|
||||
func (apiclient *ApiClient) request(url string, mime string, body io.Reader, method string, hmac string) (io.ReadCloser, error) {
|
||||
if ! strings.Contains(url, "https://") {
|
||||
if !strings.Contains(url, "https://") {
|
||||
url = fmt.Sprintf("https://%s%s", apiclient.urlserver, url)
|
||||
}
|
||||
if method == "" {
|
||||
@@ -114,8 +142,12 @@ func (apiclient *ApiClient) request(url string, mime string, body io.Reader, met
|
||||
} else {
|
||||
rq, err = http.NewRequest(method, url, body)
|
||||
}
|
||||
if err != nil { return nil, err }
|
||||
if (mime != "") { rq.Header.Set("Accept", mime) }
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if mime != "" {
|
||||
rq.Header.Set("Accept", mime)
|
||||
}
|
||||
rq.Header.Set("CONFLUENT_NODENAME", apiclient.nodename)
|
||||
if len(hmac) > 0 {
|
||||
rq.Header.Set("CONFLUENT_CRYPTHMAC", hmac)
|
||||
@@ -124,11 +156,12 @@ func (apiclient *ApiClient) request(url string, mime string, body io.Reader, met
|
||||
rq.Header.Set("CONFLUENT_APIKEY", apiclient.apikey)
|
||||
}
|
||||
rsp, err := apiclient.webclient.Do(rq)
|
||||
if err != nil { return nil, err }
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if rsp.StatusCode >= 300 {
|
||||
err = errors.New(rsp.Status)
|
||||
return nil, err
|
||||
}
|
||||
return rsp.Body, err
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user