Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions Dockerfile.minimal
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
FROM golang:alpine AS build

WORKDIR /build

ENV GO111MODULE=on \
CGO_ENABLED=0 \
GOOS=linux

COPY . .

RUN go version

RUN mkdir -p /usr/local/apisix-seed/logs

RUN go build -o apisix-seed main.go

FROM scratch

WORKDIR /usr/local/apisix-seed

COPY --from=build /usr/local/apisix-seed/logs /usr/local/apisix-seed/logs

COPY --from=build /build/apisix-seed .
COPY --from=build /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
COPY --from=build /build/conf/conf.yaml conf/conf.yaml

ENV PATH=$PATH:/usr/local/apisix-seed

ENV APISIX_SEED_WORKDIR /usr/local/apisix-seed

CMD [ "/usr/local/apisix-seed/apisix-seed" ]
8 changes: 1 addition & 7 deletions conf/conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ etcd:
# the default value is true, e.g. the certificate will be verified strictly.
log:
level: warn
path: apisix-seed.log # path is the file to write logs to. Backup log files will be retained in the same directory
path: logs/apisix-seed.log # path is the file to write logs to. Backup log files will be retained in the same directory
maxage: 168h # maxage is the maximum number of days to retain old log files based on the timestamp encoded in their filename
maxsize: 104857600 # maxsize is the maximum size in megabytes of the log file before it gets rotated. It defaults to 100mb
rotation_time: 1h # rotation_time is the log rotation time
Expand All @@ -30,9 +30,3 @@ discovery: # service discovery center
connect: 2000 # default 2000ms
send: 2000 # default 2000ms
read: 5000 # default 5000ms
zookeeper:
hosts:
- "127.0.0.1:2181"
prefix: /zookeeper
weight: 100 # default weight for node
timeout: 10 # default 10s
25 changes: 25 additions & 0 deletions docker-compose-build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
version: "3"

services:
apisix_seed_dev:
image: hugozhu/apisix-seed
build:
context: .
dockerfile: Dockerfile.minimal
restart: always
volumes:
- ./conf/conf.yaml:/usr/local/apisix-seed/conf/conf.yaml:ro
- ./logs:/usr/local/apisix-seed/logs
networks:
apisix-seed:
ipv4_address: 172.50.238.50

networks:
apisix-seed:
driver: bridge
ipam:
driver: default
config:
- subnet: 172.50.238.0/24
gateway: 172.50.238.1

83 changes: 78 additions & 5 deletions internal/core/message/a6conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,32 @@ package message

import (
"encoding/json"
"fmt"
"reflect"
"strconv"
"strings"

"github.com/api7/gopkg/pkg/log"
)

// Sample route config
// "labels":
// {
// "discovery_args.group_name": "group_name",
// "discovery_args.namespace_id": "test_name",
// "discovery_type": "nacos",
// "service_name": "test-service",
// "service_grpc_port": "10001"
// },

type Labels struct {
DiscoveryType string `json:"discovery_type,omitempty"`
ServiceName string `json:"service_name,omitempty"`
DiscoveryArgsNamespaceID string `json:"discovery_args.namespace_id,omitempty"`
DiscoveryArgsGroupName string `json:"discovery_args.group_name,omitempty"`
ServiceGrpcPort string `json:"service_grpc_port,omitempty"`
}

type UpstreamArg struct {
NamespaceID string `json:"namespace_id,omitempty"`
GroupName string `json:"group_name,omitempty"`
Expand Down Expand Up @@ -82,6 +104,7 @@ func embedElm(v reflect.Value, all map[string]interface{}) {
}

typ := v.Type()

fieldNum := typ.NumField()
for i := 0; i < fieldNum; i++ {
field := typ.Field(i)
Expand All @@ -102,10 +125,13 @@ func embedElm(v reflect.Value, all map[string]interface{}) {
continue
}

if fieldName == "DiscoveryType" || fieldName == "ServiceName" {
all["_"+tagName] = val.Interface()
delete(all, tagName)
continue
if fieldName == "DiscoveryType" || fieldName == "ServiceName" || fieldName == "DiscoveryArgs" {
name := typ.Name()
if name == "Upstream" {
// all["_"+tagName] = val.Interface()
delete(all, tagName)
continue
}
}

if val.Kind() == reflect.Ptr {
Expand Down Expand Up @@ -174,7 +200,14 @@ func NewUpstreams(value []byte) (A6Conf, error) {
return ups, nil
}

// "labels": {
// "service_name":"aquaman-user",
// "discovery_type":"nacos",
// "service_grpc_port":"10001"
// },

type Routes struct {
Labels Labels `json:"labels"`
Upstream Upstream `json:"upstream"`
All map[string]interface{} `json:"-"`
hasNodesAttr bool
Expand All @@ -185,9 +218,33 @@ func (routes *Routes) GetAll() *map[string]interface{} {
}

func (routes *Routes) Marshal() ([]byte, error) {
// If grpc port is configured, modify all nodes' port to grpc port
if routes.Labels.ServiceGrpcPort != "" && routes.Upstream.Nodes != nil {
grpcPort, err := strconv.ParseInt(routes.Labels.ServiceGrpcPort, 10, 64)
if err != nil {
log.Errorf("invalid grpc port configuration: failed to parse port %s to integer for route %s: %s", routes.Labels.ServiceGrpcPort, routes.All["id"], err)
return nil, fmt.Errorf("invalid grpc port configuration: failed to parse port %s to integer for route %s: %s", routes.Labels.ServiceGrpcPort, routes.All["id"], err)
}
if nodes, ok := routes.Upstream.Nodes.([]*Node); ok {
nodesCopy := make([]*Node, len(nodes))
for i, n := range nodes {
nodesCopy[i] = &Node{
Host: n.Host,
Weight: n.Weight,
Port: int(grpcPort),
}
log.Infof("updated gRPC port to %d for node %s in route %s", grpcPort, n.Host, routes.All["id"])
}
routes.Upstream.Nodes = nodesCopy
}
}

embedElm(reflect.ValueOf(routes), routes.All)

return json.Marshal(routes.All)
// routes.All["labels"] = routes.Labels
bytes, error := json.Marshal(routes.All)
// print("a6conf marshal 2=====", string(bytes))
return bytes, error
}

func (routes *Routes) Inject(nodes interface{}) {
Expand All @@ -206,14 +263,30 @@ func NewRoutes(value []byte) (A6Conf, error) {
routes := &Routes{
All: make(map[string]interface{}),
}

// println("===", string(value))

err := unmarshal(value, routes)
if err != nil {
return nil, err
}

if id, ok := routes.All["id"].(string); ok {
if routes.Labels.ServiceName != "" {
println("upstream nodes in route id: ", id, " will be synced with service: ", routes.Labels.ServiceName)
routes.Upstream.ServiceName = routes.Labels.ServiceName
routes.Upstream.DiscoveryType = routes.Labels.DiscoveryType
routes.Upstream.DiscoveryArgs = &UpstreamArg{
NamespaceID: routes.Labels.DiscoveryArgsNamespaceID,
GroupName: routes.Labels.DiscoveryArgsGroupName,
}
}
}

if routes.Upstream.Nodes != nil {
routes.hasNodesAttr = true
}

return routes, nil
}

Expand Down
77 changes: 72 additions & 5 deletions internal/core/message/a6conf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,6 @@ func TestMarshal_Routes(t *testing.T) {
"pass_host": "pass",
"type": "roundrobin",
"hash_on": "vars",
"_discovery_type": "nacos",
"_service_name": "APISIX-NACOS",
"discovery_args": {
"group_name": "DEFAULT_GROUP"
},
"nodes": [
{
"host": "192.168.1.1",
Expand Down Expand Up @@ -157,6 +152,78 @@ func TestMarshal_Routes(t *testing.T) {
assert.JSONEq(t, wantA6Str, string(ss))
}

func TestMarshal_Routes_With_Grpc_Port(t *testing.T) {
givenA6Str := `{
"status": 1,
"id": "3",
"uri": "/hh",
"labels": {
"discovery_type": "nacos",
"discovery_args.group_name": "DEFAULT_GROUP",
"service_name": "test-service",
"service_grpc_port":"10001"
},
"upstream": {
"scheme": "http",
"pass_host": "pass",
"type": "roundrobin",
"hash_on": "vars",
"nodes": {
"192.168.1.1:10001": 1
}
},
"create_time": 1648871506,
"priority": 0,
"update_time": 1648871506
}`
nodes := []*Node{
{Host: "192.168.1.1", Port: 80, Weight: 1},
{Host: "192.168.1.2", Port: 80, Weight: 1},
}

wantA6Str := `{
"status": 1,
"id": "3",
"uri": "/hh",
"labels": {
"discovery_type": "nacos",
"discovery_args.group_name": "DEFAULT_GROUP",
"service_name": "test-service",
"service_grpc_port":"10001"
},
"upstream": {
"scheme": "http",
"pass_host": "pass",
"type": "roundrobin",
"hash_on": "vars",
"nodes": [
{
"host": "192.168.1.1",
"port": 10001,
"weight": 1
},
{
"host": "192.168.1.2",
"port": 10001,
"weight": 1
}
]
},
"create_time": 1648871506,
"priority": 0,
"update_time": 1648871506
}`
caseDesc := "sanity"
a6, err := NewA6Conf([]byte(givenA6Str), A6RoutesConf)
assert.Nil(t, err, caseDesc)

a6.Inject(nodes)
ss, err := a6.Marshal()
assert.Nil(t, err, caseDesc)

assert.JSONEq(t, wantA6Str, string(ss))
}

func TestHasNodesAttr_Routes(t *testing.T) {
tests := []struct {
name string
Expand Down
16 changes: 14 additions & 2 deletions internal/discoverer/nacos.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (d *NacosDiscoverer) Stop() {

func (d *NacosDiscoverer) Query(msg *message.Message) error {
serviceId := serviceID(msg.ServiceName(), msg.DiscoveryArgs())

println("Nacos: ", msg.ServiceName(), msg.DiscoveryArgs())
d.cacheMutex.Lock()
defer d.cacheMutex.Unlock()

Expand All @@ -143,6 +143,10 @@ func (d *NacosDiscoverer) Query(msg *message.Message) error {
if err != nil {
return err
}
if len(nodes) == 0 {
log.Warnf("No nodes found for service[%s]", serviceId)
return fmt.Errorf("no nodes found for service[%s]", serviceId)
}

msg.InjectNodes(nodes)

Expand Down Expand Up @@ -201,7 +205,10 @@ func (d *NacosDiscoverer) Update(oldMsg, msg *message.Message) error {
if err != nil {
return err
}

if len(nodes) == 0 {
log.Warnf("No nodes found for service[%s]", serviceId)
return fmt.Errorf("no nodes found for service[%s]", serviceId)
}
msg.InjectNodes(nodes)
newDiscover.nodes = nodes
newDiscover.a6Conf = map[string]*message.Message{
Expand Down Expand Up @@ -310,6 +317,11 @@ func (d *NacosDiscoverer) newSubscribeCallback(serviceId string, metadata interf
})
}

if len(nodes) == 0 {
log.Warnf("No valid nodes found for service[%s]", serviceId)
return
}

d.cacheMutex.Lock()
defer d.cacheMutex.Unlock()

Expand Down
4 changes: 4 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ func initLogger(logConf *conf.Log) error {
return nil
}

var VERSION = 1.1

func main() {
println("=========== version ", VERSION, " ============")

conf.InitConf()

if err := initLogger(conf.LogConfig); err != nil {
Expand Down