0
点赞
收藏
分享

微信扫一扫

2022-02-23 influxdb单节点写数据的调用流程

芝婵 2022-02-23 阅读 40

摘要:

在研究influxdb集群写数据前, 先研究单节点的写数据流程

写数据的函数调用栈:

(gdb) b points_writer.go:345
Breakpoint 1 at 0xdb321b: file /root/work/ndb-influxdb-instance/src/influxdb/influxdb-1.8.4/coordinator/points_writer.go, line 345.
(gdb) c
Continuing.

Thread 1 "influxd" hit Breakpoint 1, influxdb.cluster/coordinator.(*PointsWriter).WritePointsPrivileged (w=0xc000315340, database=..., retentionPolicy=..., consistencyLevel=3, points=..., ~r4=...)
    at /root/work/ndb-influxdb-instance/src/influxdb/influxdb-1.8.4/coordinator/points_writer.go:345
345		shardMappings, err := w.MapShards(&WritePointsRequest{Database: database, RetentionPolicy: retentionPolicy, Points: points})
(gdb) bt
#0  influxdb.cluster/coordinator.(*PointsWriter).WritePointsPrivileged (w=0xc000315340, database=..., retentionPolicy=..., consistencyLevel=3, points=..., ~r4=...)
    at /root/work/ndb-influxdb-instance/src/influxdb/influxdb-1.8.4/coordinator/points_writer.go:345
#1  0x0000000000db2fe8 in influxdb.cluster/coordinator.(*PointsWriter).WritePoints (w=0xc000315340, database=..., retentionPolicy=..., consistencyLevel=3, user=..., points=..., ~r5=...)
    at /root/work/ndb-influxdb-instance/src/influxdb/influxdb-1.8.4/coordinator/points_writer.go:329
#2  0x000000000129b6c8 in influxdb.cluster/services/httpd.(*Handler).serveWrite (h=0xc0001d8870, w=..., r=0xc0000a2800, user=...)
    at /root/work/ndb-influxdb-instance/src/influxdb/influxdb-1.8.4/services/httpd/handler.go:817
#3  0x00000000012b6769 in influxdb.cluster/services/httpd.(*Handler).serveWrite-fm (w=..., r=0xc0000a2800, user=...) at /root/work/ndb-influxdb-instance/src/influxdb/influxdb-1.8.4/services/httpd/handler.go:713
#4  0x00000000012a6983 in influxdb.cluster/services/httpd.authenticate.func1 (w=..., r=0xc0000a2800) at /root/work/ndb-influxdb-instance/src/influxdb/influxdb-1.8.4/services/httpd/handler.go:1559
#5  0x00000000007f3d43 in net/http.HandlerFunc.ServeHTTP (f={void (net/http.ResponseWriter, net/http.Request *)} 0xc0004f2c68, w=..., r=0xc0000a2800) at /usr/local/go/src/net/http/server.go:2046
#6  0x00000000012a7e14 in influxdb.cluster/services/httpd.(*Handler).responseWriter.func1 (w=..., r=0xc0000a2800) at /root/work/ndb-influxdb-instance/src/influxdb/influxdb-1.8.4/services/httpd/handler.go:1739
#7  0x00000000007f3d43 in net/http.HandlerFunc.ServeHTTP (f={void (net/http.ResponseWriter, net/http.Request *)} 0xc0004f2ce0, w=..., r=0xc0000a2800) at /usr/local/go/src/net/http/server.go:2046
#8  0x0000000001293404 in influxdb.cluster/services/httpd.gzipFilter.func1 (w=..., r=0xc0000a2800) at /root/work/ndb-influxdb-instance/src/influxdb/influxdb-1.8.4/services/httpd/gzip.go:39
#9  0x00000000007f3d43 in net/http.HandlerFunc.ServeHTTP (f={void (net/http.ResponseWriter, net/http.Request *)} 0xc0004f2e30, w=..., r=0xc0000a2800) at /usr/local/go/src/net/http/server.go:2046
#10 0x00000000012a73c2 in influxdb.cluster/services/httpd.cors.func1 (w=..., r=0xc0000a2800) at /root/work/ndb-influxdb-instance/src/influxdb/influxdb-1.8.4/services/httpd/handler.go:1681
#11 0x00000000007f3d43 in net/http.HandlerFunc.ServeHTTP (f={void (net/http.ResponseWriter, net/http.Request *)} 0xc0004f3020, w=..., r=0xc0000a2800) at /usr/local/go/src/net/http/server.go:2046
#12 0x00000000012a76a7 in influxdb.cluster/services/httpd.requestID.func1 (w=..., r=0xc0000a2800) at /root/work/ndb-influxdb-instance/src/influxdb/influxdb-1.8.4/services/httpd/handler.go:1712
#13 0x00000000007f3d43 in net/http.HandlerFunc.ServeHTTP (f={void (net/http.ResponseWriter, net/http.Request *)} 0xc0004f30d0, w=..., r=0xc0000a2800) at /usr/local/go/src/net/http/server.go:2046
#14 0x00000000012a7934 in influxdb.cluster/services/httpd.(*Handler).logging.func1 (w=..., r=0xc0000a2800) at /root/work/ndb-influxdb-instance/src/influxdb/influxdb-1.8.4/services/httpd/handler.go:1720
#15 0x00000000007f3d43 in net/http.HandlerFunc.ServeHTTP (f={void (net/http.ResponseWriter, net/http.Request *)} 0xc0004f3268, w=..., r=0xc0000a2800) at /usr/local/go/src/net/http/server.go:2046
#16 0x00000000012a82bf in influxdb.cluster/services/httpd.(*Handler).recovery.func1 (w=..., r=0xc0000a2800) at /root/work/ndb-influxdb-instance/src/influxdb/influxdb-1.8.4/services/httpd/handler.go:1776
#17 0x00000000007f3d43 in net/http.HandlerFunc.ServeHTTP (f={void (net/http.ResponseWriter, net/http.Request *)} 0xc0004f33a8, w=..., r=0xc0000a2800) at /usr/local/go/src/net/http/server.go:2046
#18 0x00000000010f8091 in github.com/bmizerany/pat.(*PatternServeMux).ServeHTTP (p=0xc00012e150, w=..., r=0xc0000a2800)
    at /root/work/ndb-influxdb-instance/src/influxdb/influxdb-1.8.4/vendor/github.com/bmizerany/pat/mux.go:117
#19 0x0000000001296a79 in influxdb.cluster/services/httpd.(*Handler).ServeHTTP (h=0xc0001d8870, w=..., r=0xc0000a2800)
    at /root/work/ndb-influxdb-instance/src/influxdb/influxdb-1.8.4/services/httpd/handler.go:384
#20 0x00000000007f8c5a in net/http.serverHandler.ServeHTTP (sh=..., rw=..., req=0xc0000a2800) at /usr/local/go/src/net/http/server.go:2878
#21 0x00000000007f2bd8 in net/http.(*conn).serve (c=0xc00030e0a0, ctx=...) at /usr/local/go/src/net/http/server.go:1929
#22 0x00000000007f9b27 in net/http.(*Server).Serve·dwrap·82 () at /usr/local/go/src/net/http/server.go:3033
#23 0x0000000000470ac1 in runtime.goexit () at /usr/local/go/src/runtime/asm_amd64.s:1581
#24 0x0000000000000000 in ?? ()

关键函数:

httpd.(*Handler).serveWrite

// serveWrite receives incoming series data in line protocol format and writes it to the database.
func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.User) {
	atomic.AddInt64(&h.stats.WriteRequests, 1)
	atomic.AddInt64(&h.stats.ActiveWriteRequests, 1)
	defer func(start time.Time) {
		atomic.AddInt64(&h.stats.ActiveWriteRequests, -1)
		atomic.AddInt64(&h.stats.WriteRequestDuration, time.Since(start).Nanoseconds())
	}(time.Now())
	h.requestTracker.Add(r, user)

	database := r.URL.Query().Get("db")
	if database == "" {
		h.httpError(w, "database is required", http.StatusBadRequest)
		return
	}

	if di := h.MetaClient.Database(database); di == nil {
		h.httpError(w, fmt.Sprintf("database not found: %q", database), http.StatusNotFound)
		return
	}

	if h.Config.AuthEnabled {
		if user == nil {
			h.httpError(w, fmt.Sprintf("user is required to write to database %q", database), http.StatusForbidden)
			return
		}

		if err := h.WriteAuthorizer.AuthorizeWrite(user.ID(), database); err != nil {
			h.httpError(w, fmt.Sprintf("%q user is not authorized to write to database %q", user.ID(), database), http.StatusForbidden)
			return
		}
	}

	body := r.Body
	if h.Config.MaxBodySize > 0 {
		body = truncateReader(body, int64(h.Config.MaxBodySize))
	}

	// Handle gzip decoding of the body
	if r.Header.Get("Content-Encoding") == "gzip" {
		b, err := gzip.NewReader(r.Body)
		if err != nil {
			h.httpError(w, err.Error(), http.StatusBadRequest)
			return
		}
		defer b.Close()
		body = b
	}

	var bs []byte
	if r.ContentLength > 0 {
		if h.Config.MaxBodySize > 0 && r.ContentLength > int64(h.Config.MaxBodySize) {
			h.httpError(w, http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge)
			return
		}

		// This will just be an initial hint for the gzip reader, as the
		// bytes.Buffer will grow as needed when ReadFrom is called
		bs = make([]byte, 0, r.ContentLength)
	}
	buf := bytes.NewBuffer(bs)

	_, err := buf.ReadFrom(body)
	if err != nil {
		if err == errTruncated {
			h.httpError(w, http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge)
			return
		}

		if h.Config.WriteTracing {
			h.Logger.Info("Write handler unable to read bytes from request body")
		}
		h.httpError(w, err.Error(), http.StatusBadRequest)
		return
	}
	atomic.AddInt64(&h.stats.WriteRequestBytesReceived, int64(buf.Len()))

	if h.Config.WriteTracing {
		h.Logger.Info("Write body received by handler", zap.ByteString("body", buf.Bytes()))
	}

	points, parseError := models.ParsePointsWithPrecision(buf.Bytes(), time.Now().UTC(), r.URL.Query().Get("precision"))
	// Not points parsed correctly so return the error now
	if parseError != nil && len(points) == 0 {
		if parseError.Error() == "EOF" {
			h.writeHeader(w, http.StatusOK)
			return
		}
		h.httpError(w, parseError.Error(), http.StatusBadRequest)
		return
	}

	// Determine required consistency level.
	level := r.URL.Query().Get("consistency")
	consistency := coordinator.ConsistencyLevelOne
	if level != "" {
		var err error
		consistency, err = coordinator.ParseConsistencyLevel(level)
		if err != nil {
			h.httpError(w, err.Error(), http.StatusBadRequest)
			return
		}
	}

	// Write points.
	if err := h.PointsWriter.WritePoints(database, r.URL.Query().Get("rp"), consistency, user, points); freetsdb.IsClientError(err) {
		atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points)))
		h.httpError(w, err.Error(), http.StatusBadRequest)
		return
	} else if freetsdb.IsAuthorizationError(err) {
		atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points)))
		h.httpError(w, err.Error(), http.StatusForbidden)
		return
	} else if werr, ok := err.(tsdb.PartialWriteError); ok {
		atomic.AddInt64(&h.stats.PointsWrittenOK, int64(len(points)-werr.Dropped))
		atomic.AddInt64(&h.stats.PointsWrittenDropped, int64(werr.Dropped))
		h.httpError(w, werr.Error(), http.StatusBadRequest)
		return
	} else if err != nil {
		atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points)))
		h.httpError(w, err.Error(), http.StatusInternalServerError)
		return
	} else if parseError != nil {
		// We wrote some of the points
		atomic.AddInt64(&h.stats.PointsWrittenOK, int64(len(points)))
		// The other points failed to parse which means the client sent invalid line protocol.  We return a 400
		// response code as well as the lines that failed to parse.
		h.httpError(w, tsdb.PartialWriteError{Reason: parseError.Error()}.Error(), http.StatusBadRequest)
		return
	}

	atomic.AddInt64(&h.stats.PointsWrittenOK, int64(len(points)))
	h.writeHeader(w, http.StatusNoContent)
}
举报

相关推荐

0 条评论