Skip to content

Commit 38dc748

Browse files
committed
Merge branch 'branch-v0.2'
2 parents cff8021 + ed2f080 commit 38dc748

File tree

29 files changed

+608
-179
lines changed

29 files changed

+608
-179
lines changed

‎.gitignore‎

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/.idea
2-
/datatmp
3-
/tmp
4-
/data*
2+
data*
53
go.sum
4+
build/
5+
!distributed/data

‎README.md‎

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@ LISTEN_ADDRESS=:12345 STORAGE_ROOT=./tmp RABBITMQ_SERVER="amqp://test:test@local
6868

6969
The configuration content mentioned above needs to be configured on each node machine as needed.
7070

71-
Then you can find the corresponding binary files data.exe and interface.exe in the release. Run the two programs on different node machines through the command line.
71+
Then you can find the corresponding binary files data.exe and interface.exe in the release. Run the two programs on different node machines through the command line.
72+
73+
**After this project version v0.2, six data server nodes must be equipped, otherwise the service will be unavailable.**
7274

7375
You can compile it yourself with the source code.
7476

‎distributed/README.MD‎

Lines changed: 0 additions & 63 deletions
This file was deleted.

‎distributed/config/config.go‎

Lines changed: 105 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,58 @@
11
package config
22

33
import (
4+
"flag"
5+
"github.com/GuoYuefei/DOStorage1/distributed/utils"
6+
"gopkg.in/yaml.v2"
7+
"io/ioutil"
8+
"log"
49
"os"
10+
"os/exec"
11+
"path"
12+
"path/filepath"
513
)
614

15+
/**
16+
如果是相对位置,则是相对执行文件的位置
17+
*/
18+
var ConfigFile string
19+
720
var Pub *SPub
821
var ServerInf *SInf
922
var ServerData *SData
1023

24+
const usage string = "-c configFilePath"
25+
26+
type ServerType = int
27+
const (
28+
TypeSInf = iota
29+
TypeSData
30+
)
31+
32+
var ObjectRoot string
33+
var TempRoot string
34+
35+
func Init() {
36+
ObjectRoot = filepath.Join(ServerData.STORAGE_ROOT, "objects")
37+
TempRoot = filepath.Join(ServerData.STORAGE_ROOT, "temp")
38+
// make dir for ./data/objects
39+
_, e := os.Stat(ObjectRoot)
40+
if e != nil {
41+
e := os.MkdirAll(ObjectRoot, os.ModePerm)
42+
if e != nil {
43+
log.Fatal(e)
44+
}
45+
}
46+
47+
_, e = os.Stat(TempRoot)
48+
if e != nil {
49+
e := os.MkdirAll(TempRoot, os.ModePerm)
50+
if e != nil {
51+
log.Fatal(e)
52+
}
53+
}
54+
}
55+
1156
func init() {
1257

1358
Pub = &SPub{
@@ -17,44 +62,93 @@ func init() {
1762
ServerInf = &SInf{
1863
LISTEN_ADDRESS: "localhost:23333",
1964
}
65+
ServerInf.SPub = Pub
2066
ServerData = &SData{
2167
LISTEN_ADDRESS: "localhost:23334",
2268
STORAGE_ROOT: "./data",
2369
}
70+
ServerData.SPub = Pub
71+
72+
}
73+
74+
// serverType
75+
// TypeSInf interface server
76+
// TypeSData data server
77+
func ConfigParse(serverType ServerType) {
78+
79+
switch serverType {
80+
case TypeSInf:
81+
interfaceConfigFile, err := ioutil.ReadFile(path.Join(ConfigFile))
82+
if err == nil {
83+
yaml.Unmarshal(interfaceConfigFile, ServerInf)
84+
}
85+
utils.Log.Printf(utils.Debug, "after read config file, %v\n", ServerInf, Pub)
86+
case TypeSData:
87+
dataConfigFile, err := ioutil.ReadFile(path.Join(ConfigFile))
88+
if err == nil {
89+
yaml.Unmarshal(dataConfigFile, ServerData)
90+
}
91+
utils.Log.Printf(utils.Debug, "after read config file, %v\n", ServerData, Pub)
92+
default:
93+
utils.Log.Println(utils.Warning, "Server Type No MATCH!")
94+
}
2495

2596
if os.Getenv("RABBITMQ_SERVER") != "" {
2697
Pub.RABBITMQ_SERVER = os.Getenv("RABBITMQ_SERVER")
2798
}
2899
if os.Getenv("ES_SERVER") != "" {
29100
Pub.ES_SERVER = os.Getenv("ES_SERVER")
30101
}
31-
32102
if os.Getenv("LISTEN_ADDRESS") != "" {
33103
ServerData.LISTEN_ADDRESS = os.Getenv("LISTEN_ADDRESS")
34104
ServerInf.LISTEN_ADDRESS = ServerData.LISTEN_ADDRESS
35105
}
36-
37106
if os.Getenv("STORAGE_ROOT") != "" {
38107
ServerData.STORAGE_ROOT = os.Getenv("STORAGE_ROOT")
39108
}
109+
// 最后如果STORAGE_ROOT是相对位置的话,转成绝对路径
110+
if !filepath.IsAbs(ServerData.STORAGE_ROOT) {
111+
exePath, _ := exec.LookPath(os.Args[0])
112+
path, _ := filepath.Abs(filepath.Dir(exePath))
113+
ServerData.STORAGE_ROOT = filepath.Join(path, ServerData.STORAGE_ROOT)
114+
}
40115

41-
// todo config
42-
116+
// 配置结束后初始化
117+
Init()
118+
}
43119

120+
func Flags(serverType ServerType) {
121+
switch serverType {
122+
case TypeSInf:
123+
flag.StringVar(&ConfigFile, "c", "./config/interface.yml", usage)
124+
flag.StringVar(&ConfigFile, "-config", "./config/interface.yml", usage)
125+
case TypeSData:
126+
flag.StringVar(&ConfigFile, "c", "./config/data.yml", usage)
127+
flag.StringVar(&ConfigFile, "-config", "./config/data.yml", usage)
128+
default:
129+
utils.Log.Println(utils.Warning, "Server Type No MATCH!")
130+
}
131+
// 相对位置转换成绝对位置
132+
if !filepath.IsAbs(ConfigFile) {
133+
exePath, _ := exec.LookPath(os.Args[0])
134+
path, _ := filepath.Abs(filepath.Dir(exePath))
135+
ConfigFile = filepath.Join(path, ConfigFile)
136+
}
137+
utils.Log.Println(utils.Info, "Use config file: ", ConfigFile)
44138
}
45139

46140
type SPub struct {
47-
ES_SERVER string
48-
RABBITMQ_SERVER string
141+
ES_SERVER string `yaml:"es_server"`
142+
RABBITMQ_SERVER string `yaml:"rabbitmq_server"`
49143
}
50144

51145
type SInf struct {
52-
*SPub
53-
LISTEN_ADDRESS string
146+
*SPub `yaml:"public"`
147+
LISTEN_ADDRESS string `yaml:"listen_address"`
54148
}
55149

56150
type SData struct {
57-
*SPub
58-
LISTEN_ADDRESS string
59-
STORAGE_ROOT string
151+
*SPub `yaml:"public"`
152+
LISTEN_ADDRESS string `yaml:"listen_address"`
153+
STORAGE_ROOT string `yaml:"storage_root"`
60154
}

‎distributed/config/config_test.go‎

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package config
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
)
7+
8+
func Test_Init(t *testing.T) {
9+
fmt.Println(Pub)
10+
fmt.Printf("%p\n", Pub)
11+
fmt.Println(ServerInf)
12+
fmt.Println(ServerData)
13+
}

‎distributed/config/data.yml‎

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
public:
2+
es_server: localhost:9200
3+
rabbitmq_server: amqp://test:test@localhost:5672/
4+
listen_address: localhost:23334
5+
storage_root: ./data

‎distributed/config/interface.yml‎

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
public:
2+
es_server: localhost:9200
3+
rabbitmq_server: amqp://test:test@localhost:5672/
4+
listen_address: localhost:23333

‎distributed/data/locate/locate.go‎

Lines changed: 27 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,33 @@
11
package locate
22

33
import (
4+
"fmt"
45
"github.com/GuoYuefei/DOStorage1/distributed/config"
56
"github.com/GuoYuefei/DOStorage1/distributed/rabbitmq"
7+
"github.com/GuoYuefei/DOStorage1/distributed/types"
68
"github.com/GuoYuefei/DOStorage1/distributed/utils"
7-
"log"
8-
"os"
99
"path/filepath"
1010
"strconv"
11+
"strings"
1112
"sync"
1213
)
1314

1415
var objects map[string]int = make(map[string]int)
1516
var mutex sync.Mutex
1617

17-
var ObjectRoot = filepath.Join(config.ServerData.STORAGE_ROOT, "objects")
18-
var TempRoot = filepath.Join(config.ServerData.STORAGE_ROOT, "temp")
19-
20-
func init() {
21-
// make dir for ./data/objects
22-
_, e := os.Stat(ObjectRoot)
23-
if e != nil {
24-
e := os.MkdirAll(ObjectRoot, os.ModePerm)
25-
if e != nil {
26-
log.Fatal(e)
27-
}
28-
}
29-
30-
_, e = os.Stat(TempRoot)
31-
if e != nil {
32-
e := os.MkdirAll(TempRoot, os.ModePerm)
33-
if e != nil {
34-
log.Fatal(e)
35-
}
36-
}
37-
}
38-
39-
func Locate(hash string) bool {
18+
func Locate(hash string) int {
4019
mutex.Lock()
41-
_, ok := objects[hash]
20+
id, ok := objects[hash]
4221
mutex.Unlock()
43-
return ok
22+
if !ok {
23+
return -1
24+
}
25+
return id
4426
}
4527

46-
func Add(hash string) {
28+
func Add(hash string, id int) {
4729
mutex.Lock()
48-
objects[hash] = 1
30+
objects[hash] = id
4931
mutex.Unlock()
5032
}
5133

@@ -66,17 +48,26 @@ func StartLocate() {
6648
hash, err := strconv.Unquote(string(msg.Body))
6749
utils.PanicOnError(err, "Unquote error")
6850
utils.Log.Printf(utils.Debug, "get hash %s for locate\n", hash)
69-
if Locate(hash) {
70-
// 定位成功就发送自己的信息出去
71-
q.Send(msg.ReplyTo, config.ServerData.LISTEN_ADDRESS)
51+
id := Locate(hash)
52+
if id != -1 {
53+
q.Send(msg.ReplyTo, types.LocateMessage{Addr: config.ServerData.LISTEN_ADDRESS, Id: id})
7254
}
7355
}
7456
}
7557

7658
func CollectObjects() {
77-
files, _ := filepath.Glob(filepath.Join(ObjectRoot, "/*"))
59+
files, _ := filepath.Glob(filepath.Join(config.ObjectRoot, "/*"))
7860
for i := range files {
79-
hash := filepath.Base(files[i])
80-
objects[hash] = 1
61+
file := strings.Split(filepath.Base(files[i]), ".")
62+
if len(file) != 3 {
63+
utils.PanicOnError(fmt.Errorf("file name error, format error"), "")
64+
continue
65+
}
66+
hash := file[0]
67+
id, e := strconv.Atoi(file[1])
68+
if e != nil {
69+
utils.PanicOnError(fmt.Errorf("file name error. id is not a number"), "")
70+
}
71+
objects[hash] = id
8172
}
82-
}
73+
}

0 commit comments

Comments
 (0)